diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
| commit | 868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch) | |
| tree | 63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/management | |
| parent | 2e5ff8f1b328831043e6d7e323249d62187234c6 (diff) | |
| download | qpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz | |
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 68 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 4 |
3 files changed, 40 insertions, 36 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 7d90ed99d0..474c86ed48 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -31,6 +31,7 @@ #include <qpid/broker/Message.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/sys/Time.h" #include "qpid/sys/Thread.h" #include "qpid/broker/ConnectionState.h" @@ -535,7 +536,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf, } if (exchange.get() == 0) return; - intrusive_ptr<Message> msg(new Message()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); @@ -547,24 +548,26 @@ void ManagementAgent::sendBufferLH(Buffer& buf, header.setEof(false); content.setBof(false); - msg->getFrames().append(method); - msg->getFrames().append(header); + transfer->getFrames().append(method); + transfer->getFrames().append(header); MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); + transfer->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(length); DeliveryProperties* dp = - msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + transfer->getFrames().getHeaders()->get<DeliveryProperties>(true); dp->setRoutingKey(routingKey); - msg->getFrames().append(content); - msg->setIsManagementMessage(true); + transfer->getFrames().append(content); + + Message msg(transfer, transfer); + msg.setIsManagementMessage(true); { sys::Mutex::ScopedUnlock u(userLock); - DeliverableMessage deliverable (msg); + DeliverableMessage deliverable (msg, 0); try { exchange->route(deliverable); } catch(exception&) {} @@ -602,7 +605,7 @@ void ManagementAgent::sendBufferLH(const string& data, } if (exchange.get() == 0) return; - intrusive_ptr<Message> msg(new Message()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody(data))); @@ -612,11 +615,11 @@ void ManagementAgent::sendBufferLH(const string& data, header.setEof(false); content.setBof(false); - msg->getFrames().append(method); - msg->getFrames().append(header); + transfer->getFrames().append(method); + transfer->getFrames().append(header); MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); + transfer->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(data.length()); if (!cid.empty()) { props->setCorrelationId(cid); @@ -625,23 +628,25 @@ void ManagementAgent::sendBufferLH(const string& data, props->setAppId("qmf2"); for (i = headers.begin(); i != headers.end(); ++i) { - msg->insertCustomProperty(i->first, i->second.asString()); + props->getApplicationHeaders().setString(i->first, i->second.asString()); } DeliveryProperties* dp = - msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + transfer->getFrames().getHeaders()->get<DeliveryProperties>(true); dp->setRoutingKey(routingKey); if (ttl_msec) { dp->setTtl(ttl_msec); - msg->computeExpiration(broker->getExpiryPolicy()); } - msg->getFrames().append(content); - msg->setIsManagementMessage(true); + transfer->getFrames().append(content); + transfer->computeRequiredCredit(); + Message msg(transfer, transfer); + msg.setIsManagementMessage(true); + msg.computeExpiration(broker->getExpiryPolicy()); { sys::Mutex::ScopedUnlock u(userLock); - DeliverableMessage deliverable (msg); + DeliverableMessage deliverable (msg, 0); try { exchange->route(deliverable); } catch(exception&) {} @@ -2135,19 +2140,20 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) // authorized or not. In this case, return true (authorized) if there is no ACL in place, // otherwise return false; // - if (msg.encodedSize() > MA_BUFFER_SIZE) + if (msg.getContentSize() > MA_BUFFER_SIZE) return broker->getAcl() == 0; - msg.encodeContent(inBuffer); + inBuffer.putRawData(msg.getContent()); uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); - const framing::FieldTable *headers = msg.getApplicationHeaders(); + const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; - if (headers && msg.getAppId() == "qmf2") + if (headers && p->getAppId() == "qmf2") { mapMsg = true; @@ -2238,8 +2244,9 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) // authorization failed, send reply if replyTo present + qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); string rte = rt.getExchange(); @@ -2277,8 +2284,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) { string rte; string rtk; + qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); rte = rt.getExchange(); @@ -2290,19 +2298,19 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; - if (msg.encodedSize() > MA_BUFFER_SIZE) { + if (msg.getContentSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << - msg.encodedSize()); + msg.getContentSize()); return; } - msg.encodeContent(inBuffer); + inBuffer.putRawData(msg.getContent()); uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher()); - const framing::FieldTable *headers = msg.getApplicationHeaders(); - if (headers && msg.getAppId() == "qmf2") + const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; + if (headers && p->getAppId() == "qmf2") { string opcode = headers->getAsString("qmf.opcode"); string contentType = headers->getAsString("qmf.content"); diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp index 9432a21b3a..1c1d6ef3db 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -43,11 +43,9 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, void ManagementDirectExchange::route(Deliverable& msg) { bool routeIt = true; - const std::string& routingKey = msg.getMessage().getRoutingKey(); - const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (managementAgent) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion); + routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion); if (routeIt) DirectExchange::route(msg); diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp index e5b659f217..c8bfef3785 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -42,12 +42,10 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, void ManagementTopicExchange::route(Deliverable& msg) { bool routeIt = true; - const std::string& routingKey = msg.getMessage().getRoutingKey(); - const FieldTable* args = msg.getMessage().getApplicationHeaders(); // Intercept management agent commands if (managementAgent) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion); + routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion); if (routeIt) TopicExchange::route(msg); |
