summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-27 15:40:33 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-27 15:40:33 +0000
commit868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch)
tree63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/management
parent2e5ff8f1b328831043e6d7e323249d62187234c6 (diff)
downloadqpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp68
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.cpp4
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp4
3 files changed, 40 insertions, 36 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 7d90ed99d0..474c86ed48 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -31,6 +31,7 @@
#include <qpid/broker/Message.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
#include "qpid/broker/ConnectionState.h"
@@ -535,7 +536,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
}
if (exchange.get() == 0) return;
- intrusive_ptr<Message> msg(new Message());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
@@ -547,24 +548,26 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
header.setEof(false);
content.setBof(false);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ transfer->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(length);
DeliveryProperties* dp =
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
dp->setRoutingKey(routingKey);
- msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
+ transfer->getFrames().append(content);
+
+ Message msg(transfer, transfer);
+ msg.setIsManagementMessage(true);
{
sys::Mutex::ScopedUnlock u(userLock);
- DeliverableMessage deliverable (msg);
+ DeliverableMessage deliverable (msg, 0);
try {
exchange->route(deliverable);
} catch(exception&) {}
@@ -602,7 +605,7 @@ void ManagementAgent::sendBufferLH(const string& data,
}
if (exchange.get() == 0) return;
- intrusive_ptr<Message> msg(new Message());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer);
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody(data)));
@@ -612,11 +615,11 @@ void ManagementAgent::sendBufferLH(const string& data,
header.setEof(false);
content.setBof(false);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ transfer->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(data.length());
if (!cid.empty()) {
props->setCorrelationId(cid);
@@ -625,23 +628,25 @@ void ManagementAgent::sendBufferLH(const string& data,
props->setAppId("qmf2");
for (i = headers.begin(); i != headers.end(); ++i) {
- msg->insertCustomProperty(i->first, i->second.asString());
+ props->getApplicationHeaders().setString(i->first, i->second.asString());
}
DeliveryProperties* dp =
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->computeExpiration(broker->getExpiryPolicy());
}
- msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
+ transfer->getFrames().append(content);
+ transfer->computeRequiredCredit();
+ Message msg(transfer, transfer);
+ msg.setIsManagementMessage(true);
+ msg.computeExpiration(broker->getExpiryPolicy());
{
sys::Mutex::ScopedUnlock u(userLock);
- DeliverableMessage deliverable (msg);
+ DeliverableMessage deliverable (msg, 0);
try {
exchange->route(deliverable);
} catch(exception&) {}
@@ -2135,19 +2140,20 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
// authorized or not. In this case, return true (authorized) if there is no ACL in place,
// otherwise return false;
//
- if (msg.encodedSize() > MA_BUFFER_SIZE)
+ if (msg.getContentSize() > MA_BUFFER_SIZE)
return broker->getAcl() == 0;
- msg.encodeContent(inBuffer);
+ inBuffer.putRawData(msg.getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
- const framing::FieldTable *headers = msg.getApplicationHeaders();
+ const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
- if (headers && msg.getAppId() == "qmf2")
+ if (headers && p->getAppId() == "qmf2")
{
mapMsg = true;
@@ -2238,8 +2244,9 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
// authorization failed, send reply if replyTo present
+ qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
string rte = rt.getExchange();
@@ -2277,8 +2284,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
{
string rte;
string rtk;
+ qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
rte = rt.getExchange();
@@ -2290,19 +2298,19 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
+ if (msg.getContentSize() > MA_BUFFER_SIZE) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
- msg.encodedSize());
+ msg.getContentSize());
return;
}
- msg.encodeContent(inBuffer);
+ inBuffer.putRawData(msg.getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
- const framing::FieldTable *headers = msg.getApplicationHeaders();
- if (headers && msg.getAppId() == "qmf2")
+ const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
+ if (headers && p->getAppId() == "qmf2")
{
string opcode = headers->getAsString("qmf.opcode");
string contentType = headers->getAsString("qmf.content");
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp
index 9432a21b3a..1c1d6ef3db 100644
--- a/cpp/src/qpid/management/ManagementDirectExchange.cpp
+++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -43,11 +43,9 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
void ManagementDirectExchange::route(Deliverable& msg)
{
bool routeIt = true;
- const std::string& routingKey = msg.getMessage().getRoutingKey();
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (managementAgent)
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
+ routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion);
if (routeIt)
DirectExchange::route(msg);
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
index e5b659f217..c8bfef3785 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -42,12 +42,10 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
void ManagementTopicExchange::route(Deliverable& msg)
{
bool routeIt = true;
- const std::string& routingKey = msg.getMessage().getRoutingKey();
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
// Intercept management agent commands
if (managementAgent)
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
+ routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion);
if (routeIt)
TopicExchange::route(msg);