summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp90
1 files changed, 67 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2411e0520c..2382205268 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -20,12 +20,16 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/ConfigAsyncContext.h"
+#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/MessageStoreModule.h"
-#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/MessageStoreModule.h"
+//#include "qpid/broker/NullMessageStore.h"
+//#include "qpid/broker/RecoveryAsyncContext.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/broker/SecureConnectionFactory.h"
@@ -34,6 +38,8 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/RecoveryHandle.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
@@ -190,7 +196,9 @@ Broker::Broker(const Broker::Options& conf) :
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
- store(new NullMessageStore),
+// store(new NullMessageStore),
+// asyncStore(0),
+ asyncResultQueue(poller),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -270,23 +278,31 @@ Broker::Broker(const Broker::Options& conf) :
MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
- if (NullMessageStore::isNullStore(store.get()))
- setStore();
+// if (NullMessageStore::isNullStore(store.get()))
+// setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- if (store.get() != 0) {
+// if (store.get() != 0) {
+ if (asyncStore.get() != 0) {
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
+ QPID_LOG(info, "Store recovery starting")
RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
- store->recover(recoverer);
+ RecoveryHandle rh = asyncStore->createRecoveryHandle();
+ boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+ asyncStore->submitRecover(rh, rac);
+// store->recover(recoverer);
}
else {
QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
- store->truncateInit(true); // save old files in subdir
+// store->truncateInit(true); // save old files in subdir
+ asyncStore->initialize(true, true);
}
}
+// debug
+ else QPID_LOG(info, ">>>> No store!!!!")
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -357,10 +373,14 @@ Broker::Broker(const Broker::Options& conf) :
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
- bool storeEnabled = store.get() != NULL;
+// bool storeEnabled = store.get() != NULL;
+ bool storeEnabled = asyncStore.get() != NULL;
std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
if (status.second && storeEnabled) {
- store->create(*status.first, framing::FieldTable ());
+// store->create(*status.first, framing::FieldTable ());
+ ConfigHandle ch = asyncStore->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitCreate(ch, status.first.get(), bc);
}
}
@@ -377,23 +397,39 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+//void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
+//{
+// store.reset(new MessageStoreModule (_store));
+// setStore();
+//}
+
+void Broker::setStore(boost::shared_ptr<AsyncStore>& _asyncStore)
{
- store.reset(new MessageStoreModule (_store));
+// asyncStore.reset(_asyncStore.get());
+ asyncStore = _asyncStore;
setStore();
}
-void Broker::setAsyncStore(boost::shared_ptr<AsyncStore>& /*asyncStore*/)
-{
- // TODO: Provide implementation for async store interface
+void Broker::setStore () {
+// queues.setStore (store.get());
+ queues.setStore(asyncStore.get());
+// dtxManager.setStore (store.get());
+ dtxManager.setStore(asyncStore.get());
+// links.setStore (store.get());
+ links.setStore(asyncStore.get());
}
-void Broker::setStore () {
- queues.setStore (store.get());
- dtxManager.setStore (store.get());
- links.setStore (store.get());
+// static
+void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
+// static
+void Broker::configureComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -926,7 +962,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
return Manageable::STATUS_UNKNOWN_OBJECT;
}
q->query( results );
- return Manageable::STATUS_OK;;
+ return Manageable::STATUS_OK;
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
@@ -1168,7 +1204,10 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
alternate->incAlternateUsers();
}
if (durable) {
- store->create(*result.first, arguments);
+// store->create(*result.first, arguments);
+ ConfigHandle ch = asyncStore->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitCreate(ch, result.first.get(), bc);
}
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1208,7 +1247,11 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) store->destroy(*exchange);
+// if (exchange->isDurable()) store->destroy(*exchange);
+ if (exchange->isDurable()) {
+// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ }
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1285,7 +1328,8 @@ void Broker::unbind(const std::string& queueName,
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
- store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ // TODO: kpvdr: Async config destroy here
}
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());