/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/PersistableMessage.h" //#include "qpid/broker/MessageStore.h" //#include "qpid/broker/AsyncStore.h" #include "qpid/broker/EnqueueHandle.h" #include using namespace qpid::broker; namespace qpid { namespace broker { PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {} PersistableMessage::~PersistableMessage() {} void PersistableMessage::setIngressCompletion(boost::intrusive_ptr i) { ingressCompletion = i.get(); /** * What follows is a hack to account for the fact that the * AsyncCompletion to use may be, but is not always, this same * object. * * This is hopefully temporary, and allows the store interface to * remain unchanged without requiring another object to be allocated * for every message. * * The case in question is where a message previously passed to * the store is modified by some other queue onto which it is * pushed, and then again persisted to the store. These will be * two separate PersistableMessage instances (since the latter now * has different content), but need to share the same * AsyncCompletion (since they refer to the same incoming transfer * command). */ if (static_cast(ingressCompletion) != static_cast(this)) { holder = i; } } void PersistableMessage::flush() { //TODO: is this really the right place for this? } //// deprecated //void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*) //{ // enqueueStart(); //} void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, AsyncStore*) { enqueueStart(); } //// deprecated //void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {} void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) {} bool PersistableMessage::isDequeueComplete() { return false; } void PersistableMessage::dequeueComplete() {} MessageHandle& PersistableMessage::createMessageHandle(AsyncStore* const store) { assert (store != 0); msgHandle = store->createMessageHandle(this); return msgHandle; } EnqueueHandle& PersistableMessage::createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore) { std::map::iterator ehi = enqueueHandles.find(queueHandle); if (ehi == enqueueHandles.end()) { assert (asyncStore != 0); ehi = enqueueHandles.insert(std::pair(queueHandle, asyncStore->createEnqueueHandle(msgHandle, queueHandle))).first; } return ehi->second; } void PersistableMessage::removeEnqueueHandle(QueueHandle& queueHandle) { std::map::iterator ehi = enqueueHandles.find(queueHandle); if (ehi != enqueueHandles.end()) { enqueueHandles.erase(ehi); } } EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) { std::map::iterator ehi = enqueueHandles.find(queueHandle); assert (ehi != enqueueHandles.end()); return ehi->second; } const EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) const { std::map::const_iterator ehci = enqueueHandles.find(queueHandle); assert (ehci != enqueueHandles.end()); return ehci->second; } uint64_t PersistableMessage::getSize() { return 0; } // TODO: kpvdr: implement void PersistableMessage::write(char* /*target*/) {} // TODO: kpvdr: implement }}