diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessage.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessage.cpp | 59 |
1 files changed, 30 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index 244bee4a92..bddd5802cf 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -72,22 +72,25 @@ BasicMessage::BasicMessage( const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate ) : - Message(_publisher, _exchange, _routingKey, _mandatory, - _immediate, framing::AMQMethodBody::shared_ptr(new BasicPublishBody(ProtocolVersion(0,9)))), + Message(_publisher, _exchange, _routingKey, _mandatory, _immediate), size(0) {} // For tests only. -BasicMessage::BasicMessage() : size(0) -{} +BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {} BasicMessage::~BasicMessage(){} -void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; +void BasicMessage::setHeader(AMQHeaderBody* _header){ + if (_header) { + this->header = *_header; + isHeaderSet = true; + } + else + isHeaderSet = false; } -void BasicMessage::addContent(AMQContentBody::shared_ptr data){ +void BasicMessage::addContent(AMQContentBody* data){ if (!content.get()) { content = std::auto_ptr<Content>(new InMemoryContent()); } @@ -96,7 +99,7 @@ void BasicMessage::addContent(AMQContentBody::shared_ptr data){ } bool BasicMessage::isComplete(){ - return header.get() && (header->getContentSize() == contentSize()); + return isHeaderSet && (header.getContentSize() == contentSize()); } DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) @@ -113,10 +116,9 @@ void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, DeliveryId id, uint32_t framesize) { - channel.send(make_shared_ptr( - new BasicDeliverBody( + channel.send(BasicDeliverBody( channel.getVersion(), consumerTag, id.getValue(), - getRedelivered(), getExchange(), getRoutingKey()))); + getRedelivered(), getExchange(), getRoutingKey())); sendContent(channel, framesize); } @@ -125,11 +127,11 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel, DeliveryId id, uint32_t framesize) { - channel.send(make_shared_ptr( - new BasicGetOkBody( + channel.send( + BasicGetOkBody( channel.getVersion(), id.getValue(), getRedelivered(), getExchange(), - getRoutingKey(), messageCount))); + getRoutingKey(), messageCount)); sendContent(channel, framesize); } @@ -156,12 +158,11 @@ void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) channel.send(header); Mutex::ScopedLock locker(contentLock); if (content.get()) - content->send(channel, framesize); + content->send(channel, framesize); } BasicHeaderProperties* BasicMessage::getHeaderProperties(){ - return boost::polymorphic_downcast<BasicHeaderProperties*>( - header->getProperties()); + return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0; } const FieldTable& BasicMessage::getApplicationHeaders(){ @@ -170,7 +171,7 @@ const FieldTable& BasicMessage::getApplicationHeaders(){ bool BasicMessage::isPersistent() { - if(!header) return false; + if(!isHeaderSet) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } @@ -194,9 +195,9 @@ void BasicMessage::decodeHeader(Buffer& buffer) setRouting(exchange, routingKey); uint32_t headerSize = buffer.getLong(); - AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); - headerBody->decode(buffer, headerSize); - setHeader(headerBody); + AMQHeaderBody headerBody; + headerBody.decode(buffer, headerSize); + setHeader(&headerBody); } void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) @@ -214,9 +215,9 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) uint64_t total = 0; while (total < expectedContentSize()) { uint64_t remaining = expected - total; - AMQContentBody::shared_ptr contentBody(new AMQContentBody()); - contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); - addContent(contentBody); + AMQContentBody contentBody; + contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize); + addContent(&contentBody); total += chunkSize; } } @@ -232,8 +233,8 @@ void BasicMessage::encodeHeader(Buffer& buffer) const RecoveryManagerImpl::encodeMessageType(*this, buffer); buffer.putShortString(getExchange()); buffer.putShortString(getRoutingKey()); - buffer.putLong(header->size()); - header->encode(buffer); + buffer.putLong(header.size()); + header.encode(buffer); } void BasicMessage::encodeContent(Buffer& buffer) const @@ -258,12 +259,12 @@ uint32_t BasicMessage::encodedHeaderSize() const return RecoveryManagerImpl::encodedMessageTypeSize() +getExchange().size() + 1 + getRoutingKey().size() + 1 - + header->size() + 4;//4 extra bytes for size + + header.size() + 4;//4 extra bytes for size } uint64_t BasicMessage::expectedContentSize() { - return header.get() ? header->getContentSize() : 0; + return isHeaderSet ? header.getContentSize() : 0; } void BasicMessage::releaseContent(MessageStore* store) @@ -294,5 +295,5 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content) uint32_t BasicMessage::getRequiredCredit() const { - return header->size() + contentSize(); + return header.size() + contentSize(); } |
