summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-10-18 13:37:42 +0000
committerKim van der Riet <kpvdr@apache.org>2012-10-18 13:37:42 +0000
commit172d9b2a16cfb817bbe632d050acba7e31401cd2 (patch)
tree7c5dd5ccba8734a455f20bccaae1cb80a5483b91
parentc095a631dcb2c7be5e167ed50f658f7c24330a45 (diff)
downloadqpid-python-172d9b2a16cfb817bbe632d050acba7e31401cd2.tar.gz
WIP - async store interface working for configuration (adding and removing queues, links and exchanges) and for enqueues and dequeues of messages. Transactions are not yet included, and hence some tests will fail.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1399662 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt6
-rw-r--r--cpp/src/qpid/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp4
-rw-r--r--cpp/src/qpid/broker/Bridge.h2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp21
-rw-r--r--cpp/src/qpid/broker/Broker.h4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp14
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h9
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxManager.h2
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h4
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp34
-rw-r--r--cpp/src/qpid/broker/Exchange.h25
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h8
-rw-r--r--cpp/src/qpid/broker/Handle.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h8
-rw-r--r--cpp/src/qpid/broker/Link.cpp6
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h2
-rw-r--r--cpp/src/qpid/broker/Lvq.cpp2
-rw-r--r--cpp/src/qpid/broker/PersistableExchange.h10
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp39
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h13
-rw-r--r--cpp/src/qpid/broker/PersistableQueue.h13
-rw-r--r--cpp/src/qpid/broker/Queue.cpp92
-rw-r--r--cpp/src/qpid/broker/Queue.h14
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.cpp52
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.h14
-rw-r--r--cpp/src/qpid/broker/QueueBindings.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueBindings.h2
-rw-r--r--cpp/src/qpid/broker/QueueFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueFactory.h2
-rw-r--r--cpp/src/qpid/broker/QueueHandle.cpp8
-rw-r--r--cpp/src/qpid/broker/QueueHandle.h3
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--cpp/src/qpid/broker/RecoverableExchange.h4
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp11
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/SimpleQueue.h3
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h8
-rw-r--r--cpp/src/qpid/broker/TransactionalStore.h1
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp10
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h5
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp4
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h5
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp5
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h3
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp10
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h6
-rw-r--r--cpp/src/tests/ExchangeTest.cpp46
-rw-r--r--cpp/src/tests/HeadersExchangeTest.cpp2
-rw-r--r--cpp/src/tests/QueueTest.cpp6
61 files changed, 393 insertions, 198 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index ee25393f25..9b641558f8 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1139,6 +1139,7 @@ set (qpidbroker_SOURCES
qpid/broker/DtxManager.cpp
qpid/broker/DtxTimeout.cpp
qpid/broker/DtxWorkRecord.cpp
+ qpid/broker/EnqueueHandle.cpp
qpid/broker/ExchangeRegistry.cpp
qpid/broker/FanOutExchange.cpp
qpid/broker/HeadersExchange.cpp
@@ -1153,11 +1154,13 @@ set (qpidbroker_SOURCES
# qpid/broker/MessageStoreModule.cpp
qpid/broker/NameGenerator.cpp
# qpid/broker/NullMessageStore.cpp
+ qpid/broker/QueueAsyncContext.cpp
qpid/broker/QueueBindings.cpp
qpid/broker/QueuedMessage.cpp
qpid/broker/QueueCursor.cpp
qpid/broker/QueueDepth.cpp
qpid/broker/QueueFactory.cpp
+ qpid/broker/QueueHandle.cpp
qpid/broker/QueueRegistry.cpp
qpid/broker/QueueSettings.cpp
qpid/broker/QueueFlowLimit.cpp
@@ -1184,6 +1187,7 @@ set (qpidbroker_SOURCES
qpid/broker/TopicExchange.cpp
qpid/broker/TxAccept.cpp
qpid/broker/TxBuffer.cpp
+ qpid/broker/TxnHandle.cpp
qpid/broker/Vhost.cpp
qpid/broker/amqp_0_10/MessageTransfer.cpp
qpid/management/ManagementAgent.cpp
@@ -1191,8 +1195,6 @@ set (qpidbroker_SOURCES
qpid/management/ManagementTopicExchange.cpp
qpid/sys/TCPIOPlugin.cpp
# New async store objects and new versions of broker objects
-# qpid/broker/AsyncResultHandle.cpp
-# qpid/broker/AsyncResultHandleImpl.cpp
# qpid/broker/IdHandle.cpp
# qpid/broker/TxnAsyncContext.cpp
# qpid/broker/TxnBuffer.cpp
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index e5019604d2..3acf9fe715 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -126,7 +126,7 @@ void SessionState::senderRecord(const AMQFrame& f) {
sender.incomplete += sender.sendPoint.command;
sender.sendPoint.advance(f);
if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)
- throw ResourceLimitExceededException("Replay buffer exceeeded hard limit");
+ throw ResourceLimitExceededException("Replay buffer exceeded hard limit");
}
static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536;
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index d1706b5907..4604ac643f 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -86,7 +86,7 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create(Connection& c)
+void Bridge::create(Connection& c, AsyncStore* const store)
{
detached = false; // Reset detached in case we are recovering.
connState = &c;
@@ -153,7 +153,7 @@ void Bridge::create(Connection& c)
Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
- exchange->registerDynamicBridge(this);
+ exchange->registerDynamicBridge(this, store);
QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index ee298afd45..04ac585d80 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -141,7 +141,7 @@ class Bridge : public PersistableConfig,
bool resetProxy();
// connection Management (called by owning Link)
- void create(Connection& c);
+ void create(Connection& c, AsyncStore* const store);
void cancel(Connection& c);
void closed();
friend class Link; // to call create, cancel, closed()
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2382205268..08606516d4 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -421,15 +421,14 @@ void Broker::setStore () {
// static
void Broker::recoverComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
// static
void Broker::configureComplete(const AsyncResultHandle* const arh) {
- std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+ std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
-
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(notice, "Broker running");
@@ -1124,7 +1123,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
if (result.second) {
//add default binding:
- result.first->bind(exchanges.getDefault(), name);
+ result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
if (managementAgent.get()) {
//TODO: debatable whether we should raise an event here for
@@ -1206,6 +1205,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
if (durable) {
// store->create(*result.first, arguments);
ConfigHandle ch = asyncStore->createConfigHandle();
+ result.first->setHandle(ch);
boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
asyncStore->submitCreate(ch, result.first.get(), bc);
}
@@ -1249,8 +1249,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId,
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
// if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->isDurable()) {
-// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
-// asyncStore->submitDestroy(exchange.getHandle(), bc);
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ asyncStore->submitDestroy(exchange->getHandle(), bc);
+ exchange->resetHandle();
}
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name);
@@ -1326,11 +1327,11 @@ void Broker::unbind(const std::string& queueName,
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
} else {
- if (exchange->unbind(queue, key, 0)) {
- if (exchange->isDurable() && queue->isDurable()) {
+ if (exchange->unbind(queue, key, 0, asyncStore.get())) {
+// Move this block into Exchange which keeps the broker context.
+// if (exchange->isDurable() && queue->isDurable()) {
// store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
- // TODO: kpvdr: Async config destroy here
- }
+// }
getConfigurationObservers().unbind(
exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index e4d1d93423..698d446bca 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -222,7 +222,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
// QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
void setStore(boost::shared_ptr<AsyncStore>& asyncStore);
// MessageStore& getStore() { return *store; }
- AsyncStore& getStore() { return *asyncStore; }
+// AsyncStore& getStore() { return *asyncStore; }
+ AsyncStore* getStore() { return asyncStore.get(); }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
QueueRegistry& getQueues() { return queues; }
@@ -231,6 +232,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
+ AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index c56a1da6cc..b1130c3ec0 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -53,7 +53,7 @@ DirectExchange::DirectExchange(const string& _name, bool _durable,
mgmtExchange->set_type(typeName);
}
-bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const store)
{
string fedOp(fedOpBind);
string fedTags;
@@ -88,6 +88,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
+ persistBind(b, store);
+
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
BoundKey& bk = bindings[routingKey];
@@ -97,7 +99,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
- unbind(queue, routingKey, args);
+ unbind(queue, routingKey, args, store);
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
@@ -127,7 +129,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
return true;
}
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
@@ -156,6 +158,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
return true;
}
+//boost::shared_ptr<Exchange::Binding> DirectExchange::getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey) {
+// Mutex::ScopedLock l(lock);
+// BoundKey& bk = bindings[routingKey];
+// return bk.queues[routingKey];
+//}
+
void DirectExchange::route(Deliverable& msg)
{
const string& routingKey = msg.getMessage().getRoutingKey();
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index a0e1477d0c..6f0d470c54 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -55,12 +55,17 @@ public:
QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
- virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
+ virtual bool unbind(boost::shared_ptr<Queue> queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue,
const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
+// boost::shared_ptr<Binding> getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey);
QPID_BROKER_EXTERN virtual ~DirectExchange();
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 7e2eb927a3..c55771c4e6 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -175,7 +175,7 @@ void DtxManager::DtxCleanup::fire()
}
//void DtxManager::setStore (TransactionalStore* _store)
-void DtxManager::setStore (AsyncTransactionalStore* _ats)
+void DtxManager::setStore (AsyncTransactionalStore* const _ats)
{
// store = _store;
asyncTxnStore = _ats;
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index fe20a89c32..cbc66d6391 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -68,7 +68,7 @@ public:
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
// void setStore(TransactionalStore* store);
- void setStore(AsyncTransactionalStore* ats);
+ void setStore(AsyncTransactionalStore* const ats);
void setTimer(sys::Timer& t) { timer = &t; }
// Used by cluster for replication.
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index 9dd86bdcad..579579df2d 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -50,7 +50,7 @@ class DtxWorkRecord
const std::string xid;
// TransactionalStore* const store;
- AsyncTransactionalStore* const asyncTxnStore;
+ AsyncTransactionalStore* asyncTxnStore;
bool completed;
bool rolledback;
bool prepared;
@@ -66,7 +66,7 @@ class DtxWorkRecord
public:
QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid,
// TransactionalStore* const store);
- AsyncTransactionalStore* const store);
+ AsyncTransactionalStore* const store);
QPID_BROKER_EXTERN ~DtxWorkRecord();
QPID_BROKER_EXTERN bool prepare();
QPID_BROKER_EXTERN bool commit(bool onePhase);
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index bb5dc2b807..2414981481 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -19,7 +19,9 @@
*
*/
+#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConfigAsyncContext.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -299,7 +301,7 @@ ManagementObject* Exchange::GetManagementObject (void) const
return (ManagementObject*) mgmtExchange;
}
-void Exchange::registerDynamicBridge(DynamicBridge* db)
+void Exchange::registerDynamicBridge(DynamicBridge* db, AsyncStore* const store)
{
if (!supportsDynamicBinding())
throw Exception("Exchange type does not support dynamic binding");
@@ -315,7 +317,7 @@ void Exchange::registerDynamicBridge(DynamicBridge* db)
FieldTable args;
args.setString(qpidFedOp, fedOpReorigin);
- bind(Queue::shared_ptr(), string(), &args);
+ bind(Queue::shared_ptr(), string(), &args, store);
}
void Exchange::removeDynamicBridge(DynamicBridge* db)
@@ -344,8 +346,8 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons
}
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
- FieldTable _args, const string& _origin)
- : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0)
+ FieldTable _args, const string& _origin, ConfigHandle _cfgHandle)
+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle), mgmtBinding(0)
{
}
@@ -388,6 +390,30 @@ ManagementObject* Exchange::Binding::GetManagementObject () const
return (ManagementObject*) mgmtBinding;
}
+uint64_t Exchange::Binding::getSize() { return 0; } // TODO: kpvdr: implement persistence
+void Exchange::Binding::write(char* /*target*/) {} // TODO: kpvdr: implement persistence
+
+void Exchange::persistBind(Binding::shared_ptr b, AsyncStore* const s) {
+ if (s && broker != 0 && b->queue->isDurable() && isDurable()) {
+ b->cfgHandle = s->createConfigHandle();
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue()));
+ s->submitCreate(b->cfgHandle, b.get(), bc);
+ }
+}
+
+void Exchange::persistUnbind(Binding::shared_ptr b, AsyncStore* const s) {
+ if (s && broker != 0 && b->queue->isDurable() && isDurable()) {
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue()));
+ s->submitDestroy(b->cfgHandle, bc);
+ b->cfgHandle.reset();
+ }
+}
+
+// static
+void Exchange::configureComplete(const AsyncResultHandle* const arh) {
+ std::cout << "@@@@ Exchange: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index b4c6b799a4..df6d5d05a4 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -23,7 +23,6 @@
*/
#include <boost/shared_ptr.hpp>
-#include <qpid/broker/AsyncStore.h>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
@@ -36,7 +35,7 @@
#include "qmf/org/apache/qpid/broker/Binding.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
-#include <set>
+//#include <set>
namespace qpid {
namespace broker {
@@ -44,9 +43,9 @@ namespace broker {
class Broker;
class ExchangeRegistry;
-class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable {
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
public:
- struct Binding : public management::Manageable {
+ struct Binding : public DataSource, public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
@@ -55,13 +54,19 @@ public:
const std::string key;
const framing::FieldTable args;
std::string origin;
+ ConfigHandle cfgHandle;
qmf::org::apache::qpid::broker::Binding* mgmtBinding;
Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
- framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
+ framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string(),
+ ConfigHandle cfgHandle = ConfigHandle());
~Binding();
void startManagement();
management::ManagementObject* GetManagementObject() const;
+
+ // DataSource implementation - allows for persistence
+ uint64_t getSize();
+ void write(char* target);
};
private:
@@ -71,6 +76,7 @@ private:
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ static void configureComplete(const AsyncResultHandle* const);
protected:
mutable qpid::framing::FieldTable args;
@@ -92,6 +98,8 @@ protected:
typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
+ void persistBind(Binding::shared_ptr b, AsyncStore* const s);
+ void persistUnbind(Binding::shared_ptr b, AsyncStore* const s);
struct MatchQueue {
@@ -197,9 +205,10 @@ public:
*
*/
- virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store) = 0;
+ virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store) = 0;
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+// virtual boost::shared_ptr<Binding> getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey) = 0;
//QPID_BROKER_EXTERN virtual void setProperties(Message&);
virtual void route(Deliverable& msg) = 0;
@@ -224,7 +233,7 @@ public:
virtual const std::string& getLocalTag() const = 0;
};
- void registerDynamicBridge(DynamicBridge* db);
+ void registerDynamicBridge(DynamicBridge* db, AsyncStore* const store);
void removeDynamicBridge(DynamicBridge* db);
virtual bool supportsDynamicBinding() { return false; }
Broker* getBroker() const { return broker; }
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 5b7e0c7324..941d909778 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -46,7 +46,7 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args, AsyncStore* const store)
{
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
string fedTags(args ? args->getAsString(qpidFedTags) : "");
@@ -69,7 +69,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
} else if (fedOp == fedOpUnbind) {
propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
if (fedBinding.countFedBindings(queue->getName()) == 0)
- unbind(queue, "", args);
+ unbind(queue, "", args, store);
} else if (fedOp == fedOpReorigin) {
if (fedBinding.hasLocal()) {
propagateFedOp(string(), string(), fedOpBind, string());
@@ -82,7 +82,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
return true;
}
-bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args, AsyncStore* const /*store*/)
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index dc301a4266..e6a8f726d6 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -50,9 +50,13 @@ class FanOutExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
diff --git a/cpp/src/qpid/broker/Handle.h b/cpp/src/qpid/broker/Handle.h
index 397f58f2e7..31c029a512 100644
--- a/cpp/src/qpid/broker/Handle.h
+++ b/cpp/src/qpid/broker/Handle.h
@@ -65,6 +65,8 @@ template <class T> class Handle {
void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; }
+ void reset() { impl = 0; }
+
protected:
typedef T Impl;
Handle() :impl() {}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index ec19765387..76ffa7a922 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -167,7 +167,7 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args, AsyncStore* const store)
{
string fedOp(fedOpBind);
string fedTags;
@@ -221,7 +221,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
bindings.modify_if(MatchKey(queue, bindingKey), modifier);
propagate = modifier.shouldPropagate;
if (modifier.shouldUnbind) {
- unbind(queue, bindingKey, args);
+ unbind(queue, bindingKey, args, store);
}
} else if (fedOp == fedOpReorigin) {
@@ -246,7 +246,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
return true;
}
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args, AsyncStore* const /*store*/){
bool propagate = false;
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
{
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index ff0fec4212..10bf9c8c0b 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -91,9 +91,13 @@ class HeadersExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 416c3b7d34..9727040c9b 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -86,8 +86,8 @@ public:
std::string getType() const { return Link::exchangeTypeName; }
// Exchange methods - set up to prevent binding/unbinding etc from clients!
- bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, AsyncStore* const) { return false; }
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, AsyncStore* const) { return false; }
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
// Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
@@ -480,7 +480,7 @@ void Link::ioThreadProcessing()
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
- (*i)->create(*connection);
+ (*i)->create(*connection, broker->getStore());
}
created.clear();
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 43ed208eba..a79081b8ed 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -274,7 +274,7 @@ void LinkRegistry::destroyBridge(Bridge *bridge)
}
//void LinkRegistry::setStore (MessageStore* _store)
-void LinkRegistry::setStore (AsyncStore* _asyncStore) {
+void LinkRegistry::setStore (AsyncStore* const _asyncStore) {
asyncStore = _asyncStore;
}
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index a30f91e2a0..17f45a60c8 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -132,7 +132,7 @@ namespace broker {
* Set the store to use. May only be called once.
*/
// QPID_BROKER_EXTERN void setStore (MessageStore*);
- QPID_BROKER_EXTERN void setStore (AsyncStore*);
+ QPID_BROKER_EXTERN void setStore (AsyncStore* const);
/**
* Return the message store used.
diff --git a/cpp/src/qpid/broker/Lvq.cpp b/cpp/src/qpid/broker/Lvq.cpp
index 0bededb966..f5e66c8a74 100644
--- a/cpp/src/qpid/broker/Lvq.cpp
+++ b/cpp/src/qpid/broker/Lvq.cpp
@@ -27,7 +27,7 @@ namespace qpid {
namespace broker {
//Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
// : Queue(n, s, ms, p, b), messageMap(*m)
-Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b)
+Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* as, management::Manageable* p, Broker* b)
: Queue(n, s, as, p, b), messageMap(*m)
{
messages = m;
diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h
index e1a0853247..ffea40493d 100644
--- a/cpp/src/qpid/broker/PersistableExchange.h
+++ b/cpp/src/qpid/broker/PersistableExchange.h
@@ -23,6 +23,8 @@
*/
#include <string>
+#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/ConfigHandle.h"
#include "qpid/broker/Persistable.h"
namespace qpid {
@@ -32,11 +34,17 @@ namespace broker {
* The interface exchanges must expose to the MessageStore in order to be
* persistable.
*/
-class PersistableExchange : public Persistable
+class PersistableExchange : public Persistable, public DataSource
{
public:
virtual const std::string& getName() const = 0;
virtual ~PersistableExchange() {};
+ ConfigHandle& getHandle() { return configHandle; }
+ void setHandle(ConfigHandle& ch) { configHandle = ch; }
+ void resetHandle() { configHandle.reset(); }
+
+protected:
+ ConfigHandle configHandle;
};
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 2ef9fbfcbb..4645e5526d 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/PersistableMessage.h"
//#include "qpid/broker/MessageStore.h"
//#include "qpid/broker/AsyncStore.h"
+#include "qpid/broker/EnqueueHandle.h"
#include <iostream>
using namespace qpid::broker;
@@ -83,6 +84,44 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*)
bool PersistableMessage::isDequeueComplete() { return false; }
void PersistableMessage::dequeueComplete() {}
+MessageHandle& PersistableMessage::createMessageHandle(AsyncStore* const store) {
+ assert (store != 0);
+ msgHandle = store->createMessageHandle(this);
+ return msgHandle;
+}
+
+EnqueueHandle& PersistableMessage::createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore) {
+ std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle);
+ if (ehi == enqueueHandles.end()) {
+ assert (asyncStore != 0);
+ ehi = enqueueHandles.insert(std::pair<QueueHandle, EnqueueHandle>(queueHandle,
+ asyncStore->createEnqueueHandle(msgHandle, queueHandle))).first;
+ }
+ return ehi->second;
+}
+
+void PersistableMessage::removeEnqueueHandle(QueueHandle& queueHandle) {
+ std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle);
+ if (ehi != enqueueHandles.end()) {
+ enqueueHandles.erase(ehi);
+ }
+}
+
+EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) {
+ std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle);
+ assert (ehi != enqueueHandles.end());
+ return ehi->second;
+}
+
+const EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) const {
+ std::map<QueueHandle, EnqueueHandle>::const_iterator ehci = enqueueHandles.find(queueHandle);
+ assert (ehci != enqueueHandles.end());
+ return ehci->second;
+}
+
+uint64_t PersistableMessage::getSize() { return 0; } // TODO: kpvdr: implement
+void PersistableMessage::write(char* /*target*/) {} // TODO: kpvdr: implement
+
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 0fd3d169b4..a69d00ca71 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -41,14 +41,16 @@ class Variant;
}
namespace broker {
+class EnqueueHandle;
class MessageStore;
class AsyncStore;
class Queue;
+class QueueHandle;
/**
* Base class for persistable messages.
*/
-class PersistableMessage : public Persistable
+class PersistableMessage : public Persistable, public DataSource
{
/**
* "Ingress" messages == messages sent _to_ the broker.
@@ -63,6 +65,7 @@ class PersistableMessage : public Persistable
boost::intrusive_ptr<AsyncCompletion> holder;
mutable uint64_t persistenceId;
MessageHandle msgHandle;
+ std::map<QueueHandle, EnqueueHandle> enqueueHandles;
public:
PersistableMessage();
@@ -96,9 +99,17 @@ class PersistableMessage : public Persistable
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+ MessageHandle& createMessageHandle(AsyncStore* const store);
MessageHandle& getMessageHandle() { return msgHandle; }
const MessageHandle& getMessagehandle() const { return msgHandle; }
+ EnqueueHandle& createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore);
+ void removeEnqueueHandle(QueueHandle& queueHandle);
+ EnqueueHandle& getEnqueueHandle(QueueHandle& queueHandle);
+ const EnqueueHandle& getEnqueueHandle(QueueHandle& queueHandle) const;
+
+ uint64_t getSize();
+ void write(char* target);
virtual void decodeHeader(framing::Buffer& buffer) = 0;
virtual void decodeContent(framing::Buffer& buffer) = 0;
diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h
index 655d26bc74..ed8c193245 100644
--- a/cpp/src/qpid/broker/PersistableQueue.h
+++ b/cpp/src/qpid/broker/PersistableQueue.h
@@ -23,12 +23,15 @@
*/
#include <string>
+#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/Persistable.h"
+#include "qpid/broker/QueueHandle.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+class AsyncResultHandle;
/**
@@ -48,7 +51,7 @@ public:
* The interface queues must expose to the MessageStore in order to be
* persistable.
*/
-class PersistableQueue : public Persistable
+class PersistableQueue : public Persistable, public DataSource
{
public:
typedef boost::shared_ptr<PersistableQueue> shared_ptr;
@@ -62,14 +65,14 @@ public:
virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
virtual void flush() = 0;
- inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
+ inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}
+ inline QueueHandle& getQueueHandle() { return queueHandle; }
- PersistableQueue():externalQueueStore(NULL){
- };
+ PersistableQueue():externalQueueStore(NULL) {}
protected:
ExternalQueueStore* externalQueueStore;
-
+ QueueHandle queueHandle;
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40574ded3b..f595b81724 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
//#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
@@ -818,9 +819,17 @@ bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
// pmsg->enqueueAsync(shared_from_this(), store);
- pmsg->enqueueAsync(shared_from_this(), asyncStore);
// store->enqueue(ctxt, pmsg, *this);
- // TODO - kpvdr: async enqueue here
+ pmsg->enqueueAsync(shared_from_this(), asyncStore);
+ pmsg->createMessageHandle(asyncStore);
+ EnqueueHandle& eh = pmsg->createEnqueueHandle(queueHandle, asyncStore);
+ TxnHandle th; // TODO: kpvdr: Impement transactions
+ boost::shared_ptr<QueueAsyncContext> qac(
+ new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
+ pmsg,
+ &enqueueComplete,
+ &broker->getAsyncResultQueue()));
+ asyncStore->submitEnqueue(eh, th, qac);
}
return true;
}
@@ -893,7 +902,15 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
// if (store && pmsg) {
if (asyncStore && pmsg) {
// store->dequeue(ctxt, pmsg, *this);
- // TODO: kpvdr: async dequeue here
+ pmsg->dequeueAsync(shared_from_this(), asyncStore);
+ TxnHandle th; // TODO: kpvdr: Impement transactions
+ EnqueueHandle& eh = pmsg->getEnqueueHandle(queueHandle);
+ boost::shared_ptr<QueueAsyncContext> qac(
+ new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
+ pmsg,
+ &dequeueComplete,
+ &broker->getAsyncResultQueue()));
+ asyncStore->submitDequeue(eh, th, qac);
}
}
@@ -908,6 +925,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
}
+
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
@@ -997,7 +1015,9 @@ void Queue::create()
// if (store) {
if (asyncStore) {
// store->create(*this, settings.storeSettings);
- // TODO: kpvdr: async store create here
+ queueHandle = asyncStore->createQueueHandle(name, qpid::types::Variant::Map());
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &createComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitCreate(queueHandle, this, qac);
}
}
@@ -1068,12 +1088,14 @@ void Queue::destroyed()
if (asyncStore) {
barrier.destroy();
// store->flush(*this);
- // TODO: kpvdr: async flush here
+ boost::shared_ptr<QueueAsyncContext> flush_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &flushComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitFlush(queueHandle, flush_qac);
// store->destroy(*this);
- // TODO: kpvdr: async destroy here
+ boost::shared_ptr<QueueAsyncContext> destroy_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &destroyComplete, &broker->getAsyncResultQueue()));
+ asyncStore->submitDestroy(queueHandle, destroy_qac);
// store = 0;//ensure we make no more calls to the store for this queue
// TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
- // will cause store to be destroyed when all outstanding async ops are complete.
+ // will prevent new calls from succeeding and cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
@@ -1102,7 +1124,7 @@ void Queue::bound(const string& exchange, const string& key,
void Queue::unbind(ExchangeRegistry& exchanges)
{
- bindings.unbind(exchanges, shared_from_this());
+ bindings.unbind(exchanges, shared_from_this(), asyncStore);
}
uint64_t Queue::getPersistenceId() const
@@ -1274,6 +1296,46 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+uint64_t Queue::getSize() { return 0; } // TODO: kpvdr: implement
+void Queue::write(char* /*target*/) {} // TODO: kpvdr: implement
+
+// static
+void Queue::createComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Create complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::dequeueComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<PersistableQueue> pq = qac->getQueue();
+ boost::intrusive_ptr<PersistableMessage> pmsg = qac->getMessage();
+ QueueHandle& qh = pq->getQueueHandle();
+ pmsg->dequeueComplete();
+ pmsg->removeEnqueueHandle(qh);
+// std::cout << "@@@@ Queue \"" << pq->getName() << "\": Dequeue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::destroyComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ // TODO: kpvdr: set Queue::asyncStore = 0 from here.
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Destroy complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::enqueueComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ qac->getMessage()->enqueueComplete();
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Enqueue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
+//static
+void Queue::flushComplete(const AsyncResultHandle* const arh) {
+ boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Flush complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
+}
+
void Queue::countRejected() const
{
if (mgmtObject) {
@@ -1464,19 +1526,23 @@ void Queue::flush()
ScopedUse u(barrier);
// if (u.acquired && store) store->flush(*this);
// TODO: kpvdr: Async store flush here
- if (u.acquired && asyncStore) { /*store->flush(*this);*/ }
+ if (u.acquired && asyncStore) {
+ //store->flush(*this);
+ std::cout << "&&&& Queue::flush(): Queue=\"" << name << "\"" << std::endl << std::flush;
+ }
}
bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments)
{
- if (exchange->bind(shared_from_this(), key, &arguments)) {
+ if (exchange->bind(shared_from_this(), key, &arguments, asyncStore)) {
bound(exchange->getName(), key, arguments);
- if (exchange->isDurable() && isDurable()) {
+// Move this to Exchange::bind() which keeps the binding context
+// if (exchange->isDurable() && isDurable()) {
// store->bind(*exchange, *this, key, arguments);
- // TODO: kpvdr: Store configuration here
- }
+// // TODO: kpvdr: Store configuration here
+// }
return true;
} else {
return false;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 28fa8b5ca9..1294f813aa 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -116,7 +116,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const std::string name;
// MessageStore* store;
- AsyncStore* asyncStore;
+ AsyncStore* const asyncStore;
const OwnershipToken* owner;
uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
uint32_t browserCount; // Count of non-acquiring subscriptions.
@@ -194,6 +194,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
+ static void createComplete(const AsyncResultHandle* const arh);
+ static void dequeueComplete(const AsyncResultHandle* const arh);
+ static void destroyComplete(const AsyncResultHandle* const arh);
+ static void enqueueComplete(const AsyncResultHandle* const arh);
+ static void flushComplete(const AsyncResultHandle* const arh);
+
public:
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -233,7 +239,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*/
QPID_BROKER_EXTERN bool bind(
boost::shared_ptr<Exchange> exchange, const std::string& key,
- const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
+ const qpid::framing::FieldTable& arguments/*=qpid::framing::FieldTable()*/);
/**
* Removes (and dequeues) a message by its sequence number (used
@@ -336,6 +342,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
virtual void setExternalQueueStore(ExternalQueueStore* inst);
+ // Implement DataStore, allows Queue to persist its configuration
+ uint64_t getSize();
+ void write(char* target);
+
// Increment the rejected-by-consumer counter.
QPID_BROKER_EXTERN void countRejected() const;
QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const;
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp
index 1bad5387a3..24ecaf6b5d 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.cpp
+++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp
@@ -33,67 +33,67 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
+ m_pmsg(0),
+ m_tb(0),
m_rcb(rcb),
m_arq(arq)
-{}
+{
+ //assert(m_q.get() != 0);
+}
-/*
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
- m_msg(msg),
+ m_pmsg(msg),
+ m_tb(0),
m_rcb(rcb),
m_arq(arq)
-{}
-*/
+{
+ //assert(m_q.get() != 0);
+ //assert(m_pmsg.get() != 0);
+}
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
+ m_pmsg(0),
m_tb(tb),
m_rcb(rcb),
m_arq(arq)
{
- assert(m_q.get() != 0);
+ //assert(m_q.get() != 0);
}
-/*
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
- m_msg(msg),
+ m_pmsg(msg),
m_tb(tb),
m_rcb(rcb),
m_arq(arq)
{
- assert(m_q.get() != 0);
- assert(m_msg.get() != 0);
+ //assert(m_q.get() != 0);
+ //assert(m_pmsg.get() != 0);
}
-*/
-QueueAsyncContext::~QueueAsyncContext()
-{}
+QueueAsyncContext::~QueueAsyncContext() {}
boost::shared_ptr<PersistableQueue>
-QueueAsyncContext::getQueue() const
-{
+QueueAsyncContext::getQueue() const {
return m_q;
}
-/*
boost::intrusive_ptr<PersistableMessage>
-QueueAsyncContext::getMessage() const
-{
- return m_msg;
+QueueAsyncContext::getMessage() const {
+ return m_pmsg;
}
-*/
SimpleTxnBuffer*
QueueAsyncContext::getTxnBuffer() const {
@@ -101,28 +101,24 @@ QueueAsyncContext::getTxnBuffer() const {
}
AsyncResultQueue*
-QueueAsyncContext::getAsyncResultQueue() const
-{
+QueueAsyncContext::getAsyncResultQueue() const {
return m_arq;
}
AsyncResultCallback
-QueueAsyncContext::getAsyncResultCallback() const
-{
+QueueAsyncContext::getAsyncResultCallback() const {
return m_rcb;
}
void
-QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const
-{
+QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const {
if (m_rcb) {
m_rcb(arh);
}
}
void
-QueueAsyncContext::destroy()
-{
+QueueAsyncContext::destroy() {
delete this;
}
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h
index 4988f2af39..2ce77232b9 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.h
+++ b/cpp/src/qpid/broker/QueueAsyncContext.h
@@ -36,7 +36,7 @@
namespace qpid {
namespace broker {
-//class PersistableMessage;
+class PersistableMessage;
class PersistableQueue;
typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
@@ -47,22 +47,22 @@ public:
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
-/* QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
AsyncResultCallback rcb,
- AsyncResultQueue* const arq);*/
+ AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
-/* QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+ QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
- AsyncResultQueue* const arq);*/
+ AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
boost::shared_ptr<PersistableQueue> getQueue() const;
-// boost::intrusive_ptr<PersistableMessage> getMessage() const;
+ boost::intrusive_ptr<PersistableMessage> getMessage() const;
SimpleTxnBuffer* getTxnBuffer() const;
AsyncResultQueue* getAsyncResultQueue() const;
AsyncResultCallback getAsyncResultCallback() const;
@@ -71,7 +71,7 @@ public:
private:
boost::shared_ptr<PersistableQueue> m_q;
-// boost::intrusive_ptr<PersistableMessage> m_msg;
+ boost::intrusive_ptr<PersistableMessage> m_pmsg;
SimpleTxnBuffer* m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp
index 1cc3486d9a..3d04b3123c 100644
--- a/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/cpp/src/qpid/broker/QueueBindings.cpp
@@ -34,7 +34,7 @@ void QueueBindings::add(const string& exchange, const string& key, const FieldTa
bindings.push_back(QueueBinding(exchange, key, args));
}
-void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
+void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue, AsyncStore* const store)
{
Bindings local;
{
@@ -44,7 +44,7 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
for (Bindings::iterator i = local.begin(); i != local.end(); i++) {
Exchange::shared_ptr ex = exchanges.find(i->exchange);
- if (ex) ex->unbind(queue, i->key, &(i->args));
+ if (ex) ex->unbind(queue, i->key, &(i->args), store);
}
}
diff --git a/cpp/src/qpid/broker/QueueBindings.h b/cpp/src/qpid/broker/QueueBindings.h
index f9b07e7431..7e49c783e5 100644
--- a/cpp/src/qpid/broker/QueueBindings.h
+++ b/cpp/src/qpid/broker/QueueBindings.h
@@ -55,7 +55,7 @@ class QueueBindings
}
void add(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue);
+ void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue, AsyncStore* const store);
private:
mutable sys::Mutex lock;
diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp
index 6ff3f832e4..f2d7fb8d35 100644
--- a/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/cpp/src/qpid/broker/QueueFactory.cpp
@@ -42,7 +42,7 @@ namespace broker {
//QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
-QueueFactory::QueueFactory() : broker(0), asyncStore(0), parent(0) {}
+QueueFactory::QueueFactory() : broker(0), asyncStore(), parent(0) {}
boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
{
@@ -103,7 +103,7 @@ Broker* QueueFactory::getBroker()
return broker;
}
//void QueueFactory::setStore (MessageStore* s)
-void QueueFactory::setStore (AsyncStore* as)
+void QueueFactory::setStore (AsyncStore* const as)
{
asyncStore = as;
}
diff --git a/cpp/src/qpid/broker/QueueFactory.h b/cpp/src/qpid/broker/QueueFactory.h
index 9d0048e139..cc28356982 100644
--- a/cpp/src/qpid/broker/QueueFactory.h
+++ b/cpp/src/qpid/broker/QueueFactory.h
@@ -54,7 +54,7 @@ class QueueFactory
* Set the store to use. May only be called once.
*/
// void setStore (MessageStore*);
- void setStore (AsyncStore*);
+ void setStore (AsyncStore* const);
/**
* Return the message store used.
diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp
index 9c8d7eba67..3c647cb66a 100644
--- a/cpp/src/qpid/broker/QueueHandle.cpp
+++ b/cpp/src/qpid/broker/QueueHandle.cpp
@@ -54,12 +54,4 @@ QueueHandle::operator=(const QueueHandle& r)
return PrivateImpl::assign(*this, r);
}
-// --- QueueHandleImpl methods ---
-
-const std::string&
-QueueHandle::getName() const
-{
- return impl->getName();
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h
index 1110367418..b25064d229 100644
--- a/cpp/src/qpid/broker/QueueHandle.h
+++ b/cpp/src/qpid/broker/QueueHandle.h
@@ -45,9 +45,6 @@ public:
~QueueHandle();
QueueHandle& operator=(const QueueHandle& r);
- // --- QueueHandleImpl methods ---
- const std::string& getName() const;
-
private:
friend class PrivateImplRef<QueueHandle>;
};
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 420a9caa28..eb525b6727 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -101,7 +101,7 @@ Queue::shared_ptr QueueRegistry::get(const string& name) {
}
//void QueueRegistry::setStore (MessageStore* _store)
-void QueueRegistry::setStore (AsyncStore* _store)
+void QueueRegistry::setStore (AsyncStore* const _store)
{
QueueFactory::setStore(_store);
}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index b274493f8d..ada76f9cca 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -98,7 +98,7 @@ class QueueRegistry : QueueFactory {
* Set the store to use. May only be called once.
*/
// void setStore (MessageStore*);
- void setStore (AsyncStore*);
+ void setStore (AsyncStore* const);
/**
* Return the message store used.
diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h
index ca6cc1541e..6bda1e2617 100644
--- a/cpp/src/qpid/broker/RecoverableExchange.h
+++ b/cpp/src/qpid/broker/RecoverableExchange.h
@@ -27,6 +27,7 @@
namespace qpid {
namespace broker {
+class AsyncStore;
/**
* The interface through which bindings are recovered.
@@ -42,7 +43,8 @@ public:
*/
virtual void bind(const std::string& queue,
const std::string& routingKey,
- qpid::framing::FieldTable& args) = 0;
+ qpid::framing::FieldTable&,
+ AsyncStore* const store) = 0;
virtual ~RecoverableExchange() {};
};
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
index 0cb7c544cd..f2d28c0328 100644
--- a/cpp/src/qpid/broker/RecoveryManager.h
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -31,12 +31,13 @@
namespace qpid {
namespace broker {
+class AsyncStore;
class RecoveryManager{
public:
virtual ~RecoveryManager(){}
virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0;
- virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
+ virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer, AsyncStore* const store) = 0;
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn) = 0;
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 7deeba5e65..f3e1639ca5 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -81,7 +81,7 @@ class RecoverableExchangeImpl : public RecoverableExchange
public:
RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
void setPersistenceId(uint64_t id);
- void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+ void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args, AsyncStore* const store);
};
class RecoverableConfigImpl : public RecoverableConfig
@@ -113,13 +113,13 @@ RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Bu
}
}
-RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
+RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer, AsyncStore* const store)
{
Queue::shared_ptr queue = Queue::restore(queues, buffer);
try {
Exchange::shared_ptr exchange = exchanges.getDefault();
if (exchange) {
- exchange->bind(queue, queue->getName(), 0);
+ exchange->bind(queue, queue->getName(), 0, store);
queue->bound(exchange->getName(), queue->getName(), framing::FieldTable());
}
} catch (const framing::NotFoundException& /*e*/) {
@@ -238,10 +238,11 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id)
void RecoverableExchangeImpl::bind(const string& queueName,
const string& key,
- framing::FieldTable& args)
+ framing::FieldTable& args,
+ AsyncStore* const store)
{
Queue::shared_ptr queue = queues.find(queueName);
- exchange->bind(queue, key, &args);
+ exchange->bind(queue, key, &args, store);
queue->bound(exchange->getName(), key, args);
}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index 1ad7892b13..7fca0be194 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -42,7 +42,7 @@ namespace broker {
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
- RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
+ RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer, AsyncStore* const store);
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 97d6dc07b0..5fc9a1a932 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -156,7 +156,7 @@ void SemanticState::startTx()
}
//void SemanticState::commit(MessageStore* const store)
-void SemanticState::commit(AsyncStore* const store)
+void SemanticState::commit(AsyncTransactionalStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index a30c7e15b7..9add663e24 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -235,7 +235,7 @@ class SemanticState : private boost::noncopyable {
void startTx();
// void commit(MessageStore* const store);
- void commit(AsyncStore* const store);
+ void commit(AsyncTransactionalStore* const store);
void rollback();
void selectDtx();
bool getDtxSelected() const { return dtxSelected; }
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index a05934ed8e..cb2fe15b58 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -563,7 +563,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(getBroker().getStore());
}
void SessionAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 88cdf7e03a..944cbad0aa 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -401,7 +401,7 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
} else {
// this path runs directly from the ac->end() call in handleContent() above,
- // so *session is definately valid.
+ // so *session is definitely valid.
if (session->isAttached()) {
QPID_LOG(debug, ": receive completed for msg seq=" << id);
session->completeRcvMsg(id, requiresAccept, requiresSync);
diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h
index c2f21076cd..da5c2d9ad6 100644
--- a/cpp/src/qpid/broker/SimpleQueue.h
+++ b/cpp/src/qpid/broker/SimpleQueue.h
@@ -51,8 +51,7 @@ class SimpleMessage;
class SimpleTxnBuffer;
class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
- public PersistableQueue,
- public DataSource
+ public PersistableQueue
{
public:
SimpleQueue(const std::string& name,
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index d9871a430b..38d8f255ac 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -156,7 +156,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
@@ -226,7 +226,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
return true;
}
-bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args)
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName()
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index c50ecf1830..329c2f408f 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -96,9 +96,13 @@ public:
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h
index 2a2bac0c51..9c844c1ee1 100644
--- a/cpp/src/qpid/broker/TransactionalStore.h
+++ b/cpp/src/qpid/broker/TransactionalStore.h
@@ -18,6 +18,7 @@
* under the License.
*
*/
+
#ifndef _TransactionalStore_
#define _TransactionalStore_
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0a66527e98..3a3c9c2954 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -398,7 +398,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->bind(queue, key, &args);
+ exchange->bind(queue, key, &args, 0);
}
}
@@ -418,7 +418,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
+ exchange->unbind(queue, key, &args, 0);
}
}
@@ -514,7 +514,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
<< " key:" << key);
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- exchange->bind(queue, key, &args);
+ exchange->bind(queue, key, &args, 0);
}
}
@@ -616,8 +616,8 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
else return boost::shared_ptr<Exchange>();
}
-bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
-bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
+bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
// DataSource interface - used to write persistence data to async store
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 109d8c638e..f6983e8719 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -35,6 +35,7 @@
namespace qpid {
namespace broker {
+class AsyncStore;
class Broker;
class Link;
class Bridge;
@@ -71,8 +72,8 @@ class BrokerReplicator : public broker::Exchange,
// Exchange methods
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 8aba7555d4..cac1fdac29 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -190,8 +190,8 @@ void QueueReplicator::route(Deliverable& msg)
}
// Unused Exchange methods.
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*, qpid::broker::AsyncStore* const) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*, qpid::broker::AsyncStore* const) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index a2a158539e..8d8a41a5ba 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -71,9 +71,8 @@ class QueueReplicator : public broker::Exchange,
void deactivate(); // Call before dtor
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue
- >, const std::string&, const framing::FieldTable*);
- bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
index c8bfef3785..d1c2b1c34f 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -53,11 +53,12 @@ void ManagementTopicExchange::route(Deliverable& msg)
bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args)
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store)
{
if (qmfVersion == 1)
managementAgent->clientAdded(routingKey);
- return TopicExchange::bind(queue, routingKey, args);
+ return TopicExchange::bind(queue, routingKey, args, store);
}
void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
index 0d6b6ad50c..92894855fd 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -47,7 +47,8 @@ class ManagementTopicExchange : public virtual TopicExchange
virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ const qpid::framing::FieldTable* args,
+ AsyncStore* const store);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index 22eeff41c5..55800ac464 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -116,7 +116,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args)
+bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const store)
{
// Federation uses bind for unbind and reorigin comands as well as for binds.
@@ -136,7 +136,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, c
}
if (fedOp == fedOpUnbind) {
- return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args);
+ return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args, store);
}
else if (fedOp == fedOpReorigin) {
fedReorigin();
@@ -176,7 +176,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, c
return true;
}
-bool XmlExchange::unbind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args)
+bool XmlExchange::unbind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const /*store*/)
{
/*
* When called directly, no qpidFedOrigin argument will be
@@ -383,11 +383,11 @@ void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::strin
Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs);
}
-bool XmlExchange::fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args)
+bool XmlExchange::fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const store)
{
RWlock::ScopedRlock l(lock);
- if (unbind(queue, bindingKey, args)) {
+ if (unbind(queue, bindingKey, args, store)) {
propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin);
return true;
}
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index a80588c7ab..1fb9439b9b 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -76,9 +76,9 @@ class XmlExchange : public virtual Exchange {
virtual std::string getType() const { return typeName; }
- virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store);
virtual void route(Deliverable& msg);
@@ -86,7 +86,7 @@ class XmlExchange : public virtual Exchange {
virtual void propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args=0);
- virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args);
+ virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args, AsyncStore* const store);
virtual void fedReorigin();
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 4f18b91b5a..259860e6cd 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -51,12 +51,12 @@ QPID_AUTO_TEST_CASE(testMe)
Queue::shared_ptr queue2(new Queue("queue2", true));
TopicExchange topic("topic");
- topic.bind(queue, "abc", 0);
- topic.bind(queue2, "abc", 0);
+ topic.bind(queue, "abc", 0, 0);
+ topic.bind(queue2, "abc", 0, 0);
DirectExchange direct("direct");
- direct.bind(queue, "abc", 0);
- direct.bind(queue2, "abc", 0);
+ direct.bind(queue, "abc", 0, 0);
+ direct.bind(queue2, "abc", 0, 0);
queue.reset();
queue2.reset();
@@ -78,9 +78,9 @@ QPID_AUTO_TEST_CASE(testIsBound)
string k3("xyz");
FanOutExchange fanout("fanout");
- BOOST_CHECK(fanout.bind(a, "", 0));
- BOOST_CHECK(fanout.bind(b, "", 0));
- BOOST_CHECK(fanout.bind(c, "", 0));
+ BOOST_CHECK(fanout.bind(a, "", 0, 0));
+ BOOST_CHECK(fanout.bind(b, "", 0, 0));
+ BOOST_CHECK(fanout.bind(c, "", 0, 0));
BOOST_CHECK(fanout.isBound(a, 0, 0));
BOOST_CHECK(fanout.isBound(b, 0, 0));
@@ -88,10 +88,10 @@ QPID_AUTO_TEST_CASE(testIsBound)
BOOST_CHECK(!fanout.isBound(d, 0, 0));
DirectExchange direct("direct");
- BOOST_CHECK(direct.bind(a, k1, 0));
- BOOST_CHECK(direct.bind(a, k3, 0));
- BOOST_CHECK(direct.bind(b, k2, 0));
- BOOST_CHECK(direct.bind(c, k1, 0));
+ BOOST_CHECK(direct.bind(a, k1, 0, 0));
+ BOOST_CHECK(direct.bind(a, k3, 0, 0));
+ BOOST_CHECK(direct.bind(b, k2, 0, 0));
+ BOOST_CHECK(direct.bind(c, k1, 0, 0));
BOOST_CHECK(direct.isBound(a, 0, 0));
BOOST_CHECK(direct.isBound(a, &k1, 0));
@@ -106,10 +106,10 @@ QPID_AUTO_TEST_CASE(testIsBound)
BOOST_CHECK(!direct.isBound(d, &k3, 0));
TopicExchange topic("topic");
- BOOST_CHECK(topic.bind(a, k1, 0));
- BOOST_CHECK(topic.bind(a, k3, 0));
- BOOST_CHECK(topic.bind(b, k2, 0));
- BOOST_CHECK(topic.bind(c, k1, 0));
+ BOOST_CHECK(topic.bind(a, k1, 0, 0));
+ BOOST_CHECK(topic.bind(a, k3, 0, 0));
+ BOOST_CHECK(topic.bind(b, k2, 0, 0));
+ BOOST_CHECK(topic.bind(c, k1, 0, 0));
BOOST_CHECK(topic.isBound(a, 0, 0));
BOOST_CHECK(topic.isBound(a, &k1, 0));
@@ -137,10 +137,10 @@ QPID_AUTO_TEST_CASE(testIsBound)
args3.setString("c", "C");
args3.setInt("b", 6);
- headers.bind(a, "", &args1);
- headers.bind(a, "", &args3);
- headers.bind(b, "", &args2);
- headers.bind(c, "", &args1);
+ headers.bind(a, "", &args1, 0);
+ headers.bind(a, "", &args3, 0);
+ headers.bind(b, "", &args2, 0);
+ headers.bind(c, "", &args1, 0);
BOOST_CHECK(headers.isBound(a, 0, 0));
BOOST_CHECK(headers.isBound(a, 0, &args1));
@@ -250,10 +250,10 @@ QPID_AUTO_TEST_CASE(testIVEOption)
Queue::shared_ptr queue2(new Queue("queue2", true));
Queue::shared_ptr queue3(new Queue("queue3", true));
- BOOST_CHECK(direct.bind(queue, "abc", 0));
- BOOST_CHECK(fanout.bind(queue1, "abc", 0));
- BOOST_CHECK(header.bind(queue2, "", &args2));
- BOOST_CHECK(topic.bind(queue3, "abc", 0));
+ BOOST_CHECK(direct.bind(queue, "abc", 0, 0));
+ BOOST_CHECK(fanout.bind(queue1, "abc", 0, 0));
+ BOOST_CHECK(header.bind(queue2, "", &args2, 0));
+ BOOST_CHECK(topic.bind(queue3, "abc", 0, 0));
BOOST_CHECK_EQUAL(1u,queue->getMessageCount());
BOOST_CHECK_EQUAL(1u,queue1->getMessageCount());
diff --git a/cpp/src/tests/HeadersExchangeTest.cpp b/cpp/src/tests/HeadersExchangeTest.cpp
index 40deb59c86..7c8ee4a2d9 100644
--- a/cpp/src/tests/HeadersExchangeTest.cpp
+++ b/cpp/src/tests/HeadersExchangeTest.cpp
@@ -109,7 +109,7 @@ QPID_AUTO_TEST_CASE(testBindNoXMatch)
FieldTable args;
try {
//just checking this doesn't cause assertion etc
- exchange.bind(queue, key, &args);
+ exchange.bind(queue, key, &args, 0);
} catch(qpid::Exception&) {
//expected
}
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 7b7c653029..d86c18c38d 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -100,15 +100,15 @@ QPID_AUTO_TEST_CASE(testBound){
ExchangeRegistry exchanges;
//establish bindings from exchange->queue and notify the queue as it is bound:
Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
- exchange1->bind(queue, key, &args);
+ exchange1->bind(queue, key, &args, 0);
queue->bound(exchange1->getName(), key, args);
Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first;
- exchange2->bind(queue, key, &args);
+ exchange2->bind(queue, key, &args, 0);
queue->bound(exchange2->getName(), key, args);
Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first;
- exchange3->bind(queue, key, &args);
+ exchange3->bind(queue, key, &args, 0);
queue->bound(exchange3->getName(), key, args);
//delete one of the exchanges: