diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 90 |
1 files changed, 67 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 2411e0520c..2382205268 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -20,12 +20,16 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/ConfigAsyncContext.h" +#include "qpid/broker/ConfigHandle.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/MessageStoreModule.h" -#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/MessageStoreModule.h" +//#include "qpid/broker/NullMessageStore.h" +//#include "qpid/broker/RecoveryAsyncContext.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" #include "qpid/broker/SecureConnectionFactory.h" @@ -34,6 +38,8 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" +#include "qpid/broker/RecoveryAsyncContext.h" +#include "qpid/broker/RecoveryHandle.h" #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" @@ -190,7 +196,9 @@ Broker::Broker(const Broker::Options& conf) : managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) : 0), - store(new NullMessageStore), +// store(new NullMessageStore), +// asyncStore(0), + asyncResultQueue(poller), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), queues(this), @@ -270,23 +278,31 @@ Broker::Broker(const Broker::Options& conf) : MessageGroupManager::setDefaults(conf.defaultMsgGroup); // If no plugin store module registered itself, set up the null store. - if (NullMessageStore::isNullStore(store.get())) - setStore(); +// if (NullMessageStore::isNullStore(store.get())) +// setStore(); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if (store.get() != 0) { +// if (store.get() != 0) { + if (asyncStore.get() != 0) { // The cluster plug-in will setRecovery(false) on all but the first // broker to join a cluster. if (getRecovery()) { + QPID_LOG(info, "Store recovery starting") RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); - store->recover(recoverer); + RecoveryHandle rh = asyncStore->createRecoveryHandle(); + boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue)); + asyncStore->submitRecover(rh, rac); +// store->recover(recoverer); } else { QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); - store->truncateInit(true); // save old files in subdir +// store->truncateInit(true); // save old files in subdir + asyncStore->initialize(true, true); } } +// debug + else QPID_LOG(info, ">>>> No store!!!!") //ensure standard exchanges exist (done after recovery from store) declareStandardExchange(amq_direct, DirectExchange::typeName); @@ -357,10 +373,14 @@ Broker::Broker(const Broker::Options& conf) : void Broker::declareStandardExchange(const std::string& name, const std::string& type) { - bool storeEnabled = store.get() != NULL; +// bool storeEnabled = store.get() != NULL; + bool storeEnabled = asyncStore.get() != NULL; std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); if (status.second && storeEnabled) { - store->create(*status.first, framing::FieldTable ()); +// store->create(*status.first, framing::FieldTable ()); + ConfigHandle ch = asyncStore->createConfigHandle(); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + asyncStore->submitCreate(ch, status.first.get(), bc); } } @@ -377,23 +397,39 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts) return boost::intrusive_ptr<Broker>(new Broker(opts)); } -void Broker::setStore (boost::shared_ptr<MessageStore>& _store) +//void Broker::setStore (boost::shared_ptr<MessageStore>& _store) +//{ +// store.reset(new MessageStoreModule (_store)); +// setStore(); +//} + +void Broker::setStore(boost::shared_ptr<AsyncStore>& _asyncStore) { - store.reset(new MessageStoreModule (_store)); +// asyncStore.reset(_asyncStore.get()); + asyncStore = _asyncStore; setStore(); } -void Broker::setAsyncStore(boost::shared_ptr<AsyncStore>& /*asyncStore*/) -{ - // TODO: Provide implementation for async store interface +void Broker::setStore () { +// queues.setStore (store.get()); + queues.setStore(asyncStore.get()); +// dtxManager.setStore (store.get()); + dtxManager.setStore(asyncStore.get()); +// links.setStore (store.get()); + links.setStore(asyncStore.get()); } -void Broker::setStore () { - queues.setStore (store.get()); - dtxManager.setStore (store.get()); - links.setStore (store.get()); +// static +void Broker::recoverComplete(const AsyncResultHandle* const arh) { + std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } +// static +void Broker::configureComplete(const AsyncResultHandle* const arh) { + std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + + void Broker::run() { if (config.workerThreads > 0) { QPID_LOG(notice, "Broker running"); @@ -926,7 +962,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name, return Manageable::STATUS_UNKNOWN_OBJECT; } q->query( results ); - return Manageable::STATUS_OK;; + return Manageable::STATUS_OK; } Manageable::status_t Broker::getTimestampConfig(bool& receive, @@ -1168,7 +1204,10 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( alternate->incAlternateUsers(); } if (durable) { - store->create(*result.first, arguments); +// store->create(*result.first, arguments); + ConfigHandle ch = asyncStore->createConfigHandle(); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + asyncStore->submitCreate(ch, result.first.get(), bc); } if (managementAgent.get()) { //TODO: debatable whether we should raise an event here for @@ -1208,7 +1247,11 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); - if (exchange->isDurable()) store->destroy(*exchange); +// if (exchange->isDurable()) store->destroy(*exchange); + if (exchange->isDurable()) { +// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); +// asyncStore->submitDestroy(exchange.getHandle(), bc); + } if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name); @@ -1285,7 +1328,8 @@ void Broker::unbind(const std::string& queueName, } else { if (exchange->unbind(queue, key, 0)) { if (exchange->isDurable() && queue->isDurable()) { - store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); +// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); + // TODO: kpvdr: Async config destroy here } getConfigurationObservers().unbind( exchange, queue, key, framing::FieldTable()); |
