/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include #include #include #include "AMQMethodBody.h" #include "AMQFrame.h" #include "framing/ChannelAdapter.h" using namespace boost; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; BasicMessage::BasicMessage( const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo ) : Message(_publisher, _exchange, _routingKey, _mandatory, _immediate, respondTo), size(0) {} // FIXME aconway 2007-02-01: remove. // BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) : // publisher(0), size(0) // { // decode(buffer, headersOnly, contentChunkSize); // } // For tests only. BasicMessage::BasicMessage() : size(0) {} BasicMessage::~BasicMessage(){} void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } void BasicMessage::addContent(AMQContentBody::shared_ptr data){ if (!content.get()) { content = std::auto_ptr(new InMemoryContent()); } content->add(data); size += data->size(); } bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize) { // CCT -- TODO - Update code generator to take pointer/ not // instance to avoid extra contruction channel.send( new BasicDeliverBody( channel.getVersion(), consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())); sendContent(channel, framesize); } void BasicMessage::sendGetOk(const MethodContext& context, const std::string& /*destination*/, uint32_t messageCount, uint64_t deliveryTag, uint32_t framesize) { // CCT -- TODO - Update code generator to take pointer/ not // instance to avoid extra contruction context.channel->send( new BasicGetOkBody( context.channel->getVersion(), context.methodBody->getRequestId(), deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)); sendContent(*context.channel, framesize); } void BasicMessage::sendContent( ChannelAdapter& channel, uint32_t framesize) { channel.send(header); Mutex::ScopedLock locker(contentLock); if (content.get()) content->send(channel, framesize); } BasicHeaderProperties* BasicMessage::getHeaderProperties(){ return boost::polymorphic_downcast( header->getProperties()); } const FieldTable& BasicMessage::getApplicationHeaders(){ return getHeaderProperties()->getHeaders(); } bool BasicMessage::isPersistent() { if(!header) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) { decodeHeader(buffer); if (!headersOnly) decodeContent(buffer, contentChunkSize); } void BasicMessage::decodeHeader(Buffer& buffer) { string exchange; string routingKey; buffer.getShortString(exchange); buffer.getShortString(routingKey); setRouting(exchange, routingKey); uint32_t headerSize = buffer.getLong(); AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); headerBody->decode(buffer, headerSize); setHeader(headerBody); } void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) { uint64_t expected = expectedContentSize(); if (expected != buffer.available()) { std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; throw Exception("Cannot decode content, buffer not large enough."); } if (!chunkSize || chunkSize > expected) { chunkSize = expected; } uint64_t total = 0; while (total < expectedContentSize()) { uint64_t remaining = expected - total; AMQContentBody::shared_ptr contentBody(new AMQContentBody()); contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); addContent(contentBody); total += chunkSize; } } void BasicMessage::encode(Buffer& buffer) { encodeHeader(buffer); encodeContent(buffer); } void BasicMessage::encodeHeader(Buffer& buffer) { buffer.putShortString(getExchange()); buffer.putShortString(getRoutingKey()); buffer.putLong(header->size()); header->encode(buffer); } void BasicMessage::encodeContent(Buffer& buffer) { Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } uint32_t BasicMessage::encodedSize() { return encodedHeaderSize() + encodedContentSize(); } uint32_t BasicMessage::encodedContentSize() { Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } uint32_t BasicMessage::encodedHeaderSize() { return getExchange().size() + 1 + getRoutingKey().size() + 1 + header->size() + 4;//4 extra bytes for size } uint64_t BasicMessage::expectedContentSize() { return header.get() ? header->getContentSize() : 0; } void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); if (!isPersistent() && getPersistenceId() == 0) { store->stage(this); } if (!content.get() || content->size() > 0) { // FIXME aconway 2007-02-07: handle MessageMessage. //set content to lazy loading mode (but only if there is stored content): //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is // then set as a member of that message so its lifetime is guaranteed to be no longer than // that of the message itself content = std::auto_ptr( new LazyLoadedContent(store, this, expectedContentSize())); } } void BasicMessage::setContent(std::auto_ptr& _content) { Mutex::ScopedLock locker(contentLock); content = _content; }