summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp59
1 files changed, 30 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 244bee4a92..bddd5802cf 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -72,22 +72,25 @@ BasicMessage::BasicMessage(
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate
) :
- Message(_publisher, _exchange, _routingKey, _mandatory,
- _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))),
+ Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
size(0)
{}
// For tests only.
-BasicMessage::BasicMessage() : size(0)
-{}
+BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
BasicMessage::~BasicMessage(){}
-void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
+void BasicMessage::setHeader(AMQHeaderBody* _header){
+ if (_header) {
+ this->header = *_header;
+ isHeaderSet = true;
+ }
+ else
+ isHeaderSet = false;
}
-void BasicMessage::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody* data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -96,7 +99,7 @@ void BasicMessage::addContent(AMQContentBody::shared_ptr data){
}
bool BasicMessage::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
+ return isHeaderSet && (header.getContentSize() == contentSize());
}
DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
@@ -113,10 +116,9 @@ void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicDeliverBody(
+ channel.send(BasicDeliverBody(
channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey())));
+ getRedelivered(), getExchange(), getRoutingKey()));
sendContent(channel, framesize);
}
@@ -125,11 +127,11 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel,
DeliveryId id,
uint32_t framesize)
{
- channel.send(make_shared_ptr(
- new BasicGetOkBody(
+ channel.send(
+ BasicGetOkBody(
channel.getVersion(),
id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount)));
+ getRoutingKey(), messageCount));
sendContent(channel, framesize);
}
@@ -156,12 +158,11 @@ void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
channel.send(header);
Mutex::ScopedLock locker(contentLock);
if (content.get())
- content->send(channel, framesize);
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
+ return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
}
const FieldTable& BasicMessage::getApplicationHeaders(){
@@ -170,7 +171,7 @@ const FieldTable& BasicMessage::getApplicationHeaders(){
bool BasicMessage::isPersistent()
{
- if(!header) return false;
+ if(!isHeaderSet) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
@@ -194,9 +195,9 @@ void BasicMessage::decodeHeader(Buffer& buffer)
setRouting(exchange, routingKey);
uint32_t headerSize = buffer.getLong();
- AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
- headerBody->decode(buffer, headerSize);
- setHeader(headerBody);
+ AMQHeaderBody headerBody;
+ headerBody.decode(buffer, headerSize);
+ setHeader(&headerBody);
}
void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
@@ -214,9 +215,9 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
uint64_t total = 0;
while (total < expectedContentSize()) {
uint64_t remaining = expected - total;
- AMQContentBody::shared_ptr contentBody(new AMQContentBody());
- contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(contentBody);
+ AMQContentBody contentBody;
+ contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
+ addContent(&contentBody);
total += chunkSize;
}
}
@@ -232,8 +233,8 @@ void BasicMessage::encodeHeader(Buffer& buffer) const
RecoveryManagerImpl::encodeMessageType(*this, buffer);
buffer.putShortString(getExchange());
buffer.putShortString(getRoutingKey());
- buffer.putLong(header->size());
- header->encode(buffer);
+ buffer.putLong(header.size());
+ header.encode(buffer);
}
void BasicMessage::encodeContent(Buffer& buffer) const
@@ -258,12 +259,12 @@ uint32_t BasicMessage::encodedHeaderSize() const
return RecoveryManagerImpl::encodedMessageTypeSize()
+getExchange().size() + 1
+ getRoutingKey().size() + 1
- + header->size() + 4;//4 extra bytes for size
+ + header.size() + 4;//4 extra bytes for size
}
uint64_t BasicMessage::expectedContentSize()
{
- return header.get() ? header->getContentSize() : 0;
+ return isHeaderSet ? header.getContentSize() : 0;
}
void BasicMessage::releaseContent(MessageStore* store)
@@ -294,5 +295,5 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content)
uint32_t BasicMessage::getRequiredCredit() const
{
- return header->size() + contentSize();
+ return header.size() + contentSize();
}