From 8d826b88be96ed9605fc267ccbe5be8bb8c95e76 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 8 Jun 2012 19:09:23 +0000 Subject: QPID-3858: WIP - tidy-up: renamed Mock* to Simple* (as the classes are not empty mocks) git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1348194 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/CMakeLists.txt | 6 +- .../asyncPerf/MessageAsyncContext.cpp | 8 +- .../storePerftools/asyncPerf/MessageAsyncContext.h | 16 +- .../storePerftools/asyncPerf/MessageConsumer.cpp | 6 +- .../storePerftools/asyncPerf/MessageConsumer.h | 6 +- .../storePerftools/asyncPerf/MessageProducer.cpp | 12 +- .../storePerftools/asyncPerf/MessageProducer.h | 6 +- .../asyncPerf/MockPersistableMessage.cpp | 107 ------ .../asyncPerf/MockPersistableMessage.h | 78 ---- .../asyncPerf/MockPersistableQueue.cpp | 422 --------------------- .../asyncPerf/MockPersistableQueue.h | 154 -------- .../asyncPerf/MockTransactionContext.cpp | 240 ------------ .../asyncPerf/MockTransactionContext.h | 90 ----- .../tests/storePerftools/asyncPerf/PerfTest.cpp | 4 +- cpp/src/tests/storePerftools/asyncPerf/PerfTest.h | 4 +- .../storePerftools/asyncPerf/QueueAsyncContext.cpp | 10 +- .../storePerftools/asyncPerf/QueueAsyncContext.h | 18 +- .../storePerftools/asyncPerf/QueuedMessage.cpp | 58 +-- .../tests/storePerftools/asyncPerf/QueuedMessage.h | 14 +- .../asyncPerf/SimplePersistableMessage.cpp | 107 ++++++ .../asyncPerf/SimplePersistableMessage.h | 78 ++++ .../asyncPerf/SimplePersistableQueue.cpp | 422 +++++++++++++++++++++ .../asyncPerf/SimplePersistableQueue.h | 153 ++++++++ .../asyncPerf/SimpleTransactionContext.cpp | 240 ++++++++++++ .../asyncPerf/SimpleTransactionContext.h | 90 +++++ .../asyncPerf/TransactionAsyncContext.cpp | 4 +- .../asyncPerf/TransactionAsyncContext.h | 8 +- 27 files changed, 1156 insertions(+), 1205 deletions(-) delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h (limited to 'cpp/src') diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 057879a602..1d04828a49 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -383,12 +383,12 @@ set (asyncStorePerf_SOURCES storePerftools/asyncPerf/MessageConsumer.cpp storePerftools/asyncPerf/MessageDeque.cpp storePerftools/asyncPerf/MessageProducer.cpp - storePerftools/asyncPerf/MockPersistableMessage.cpp - storePerftools/asyncPerf/MockPersistableQueue.cpp - storePerftools/asyncPerf/MockTransactionContext.cpp storePerftools/asyncPerf/PerfTest.cpp storePerftools/asyncPerf/QueueAsyncContext.cpp storePerftools/asyncPerf/QueuedMessage.cpp + storePerftools/asyncPerf/SimplePersistableMessage.cpp + storePerftools/asyncPerf/SimplePersistableQueue.cpp + storePerftools/asyncPerf/SimpleTransactionContext.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp storePerftools/asyncPerf/TransactionAsyncContext.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp index ad67bdd32f..8b14bff4f8 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp @@ -29,9 +29,9 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -MessageAsyncContext::MessageAsyncContext(boost::shared_ptr msg, +MessageAsyncContext::MessageAsyncContext(boost::shared_ptr msg, const qpid::asyncStore::AsyncOperation::opCode op, - boost::shared_ptr q) : + boost::shared_ptr q) : m_msg(msg), m_op(op), m_q(q) @@ -55,13 +55,13 @@ MessageAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::shared_ptr +boost::shared_ptr MessageAsyncContext::getMessage() const { return m_msg; } -boost::shared_ptr +boost::shared_ptr MessageAsyncContext::getQueue() const { return m_q; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h index 6b41ad9181..9d7958166e 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h @@ -33,26 +33,26 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockPersistableMessage; -class MockPersistableQueue; +class SimplePersistableMessage; +class SimplePersistableQueue; class MessageAsyncContext : public qpid::broker::BrokerAsyncContext { public: - MessageAsyncContext(boost::shared_ptr msg, + MessageAsyncContext(boost::shared_ptr msg, const qpid::asyncStore::AsyncOperation::opCode op, - boost::shared_ptr q); + boost::shared_ptr q); virtual ~MessageAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::shared_ptr getMessage() const; - boost::shared_ptr getQueue() const; + boost::shared_ptr getMessage() const; + boost::shared_ptr getQueue() const; void destroy(); private: - boost::shared_ptr m_msg; + boost::shared_ptr m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; - boost::shared_ptr m_q; + boost::shared_ptr m_q; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 89b9b5b9b5..07baabf6d6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,7 +23,7 @@ #include "MessageConsumer.h" -#include "MockPersistableQueue.h" +#include "SimplePersistableQueue.h" #include "TestOptions.h" #include // uint32_t @@ -32,10 +32,10 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockTransactionContext; +class SimpleTransactionContext; MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - boost::shared_ptr queue) : + boost::shared_ptr queue) : m_perfTestParams(perfTestParams), m_queue(queue) {} diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index f11b605244..7f5816b6a0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -30,21 +30,21 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockPersistableQueue; +class SimplePersistableQueue; class TestOptions; class MessageConsumer { public: MessageConsumer(const TestOptions& perfTestParams, - boost::shared_ptr queue); + boost::shared_ptr queue); virtual ~MessageConsumer(); void* runConsumers(); static void* startConsumers(void* ptr); private: const TestOptions& m_perfTestParams; - boost::shared_ptr m_queue; + boost::shared_ptr m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 8540ff2b61..e75f773e23 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -23,8 +23,8 @@ #include "MessageProducer.h" -#include "MockPersistableMessage.h" -#include "MockPersistableQueue.h" +#include "SimplePersistableMessage.h" +#include "SimplePersistableQueue.h" #include "TestOptions.h" #include // uint32_t @@ -33,12 +33,12 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockTransactionContext; +class SimpleTransactionContext; MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, - boost::shared_ptr queue) : + boost::shared_ptr queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), @@ -51,9 +51,9 @@ MessageProducer::~MessageProducer() void* MessageProducer::runProducers() { - boost::shared_ptr txn; + boost::shared_ptr txn; for (uint32_t numMsgs=0; numMsgs msg(new MockPersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + boost::shared_ptr msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); m_queue->deliver(msg); } return 0; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 5cb6d7c5f8..746247e849 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -37,7 +37,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockPersistableQueue; +class SimplePersistableQueue; class TestOptions; class MessageProducer @@ -46,7 +46,7 @@ public: MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, - boost::shared_ptr queue); + boost::shared_ptr queue); virtual ~MessageProducer(); void* runProducers(); static void* startProducers(void* ptr); @@ -54,7 +54,7 @@ private: const TestOptions& m_perfTestParams; const char* m_msgData; qpid::asyncStore::AsyncStoreImpl* m_store; - boost::shared_ptr m_queue; + boost::shared_ptr m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp deleted file mode 100644 index fc04bc746e..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableMessage.cpp - */ - -#include "MockPersistableMessage.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MockPersistableMessage::MockPersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast(msgSize)), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) -{} - -MockPersistableMessage::~MockPersistableMessage() -{} - -const qpid::broker::MessageHandle& -MockPersistableMessage::getHandle() const -{ - return m_msgHandle; -} - -qpid::broker::MessageHandle& -MockPersistableMessage::getHandle() -{ - return m_msgHandle; -} - -void -MockPersistableMessage::setPersistenceId(uint64_t id) const -{ - m_persistenceId = id; -} - -uint64_t -MockPersistableMessage::getPersistenceId() const -{ - return m_persistenceId; -} - -void -MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putRawData(m_msg); -} - -uint32_t -MockPersistableMessage::encodedSize() const -{ - return static_cast(m_msg.size()); -} - -void -MockPersistableMessage::allDequeuesComplete() -{} - -uint32_t -MockPersistableMessage::encodedHeaderSize() const -{ - return 0; -} - -bool -MockPersistableMessage::isPersistent() const -{ - return m_msgHandle.isValid(); -} - -uint64_t -MockPersistableMessage::getSize() -{ - return m_msg.size(); -} - -void -MockPersistableMessage::write(char* target) -{ - ::memcpy(target, m_msg.data(), m_msg.size()); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h deleted file mode 100644 index abaa93d3e3..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableMessage.h - */ - -#ifndef tests_storePerftools_asyncPerf_MockPersistableMessage_h_ -#define tests_storePerftools_asyncPerf_MockPersistableMessage_h_ - -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/MessageHandle.h" -#include "qpid/broker/PersistableMessage.h" - -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MockPersistableQueue; - -class MockPersistableMessage: public qpid::broker::PersistableMessage, - public qpid::broker::DataSource -{ -public: - MockPersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store); - virtual ~MockPersistableMessage(); - const qpid::broker::MessageHandle& getHandle() const; - qpid::broker::MessageHandle& getHandle(); - - // Interface Persistable - virtual void setPersistenceId(uint64_t id) const; - virtual uint64_t getPersistenceId() const; - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - - // Interface PersistableMessage - virtual void allDequeuesComplete(); - virtual uint32_t encodedHeaderSize() const; - virtual bool isPersistent() const; - - // Interface DataStore - virtual uint64_t getSize(); - virtual void write(char* target); - -private: - mutable uint64_t m_persistenceId; - const std::string m_msg; - qpid::broker::MessageHandle m_msgHandle; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfools_asyncPerf_MockPersistableMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp deleted file mode 100644 index 4b22d63fe4..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableQueue.cpp - */ - -#include "MockPersistableQueue.h" - -#include "MessageDeque.h" -#include "MockPersistableMessage.h" -#include "MockTransactionContext.h" -#include "QueueAsyncContext.h" -#include "QueuedMessage.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/AsyncResultHandle.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MockPersistableQueue::MockPersistableQueue(const std::string& name, - const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& arq) : - qpid::broker::PersistableQueue(), - m_name(name), - m_store(store), - m_resultQueue(arq), - m_asyncOpCounter(0UL), - m_persistenceId(0ULL), - m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_destroyPending(false), - m_destroyed(false), - m_barrier(*this), - m_messages(new MessageDeque()) -{ - if (m_store != 0) { - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); - } -} - -MockPersistableQueue::~MockPersistableQueue() -{ -// m_store->flush(*this); - // TODO: Make destroying the store a test parameter -// m_store->destroy(*this); -// m_store = 0; -} - -// static -void -MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { -//std::cout << "QQQ MockPersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; - // Handle async success here - switch(qc->getOpCode()) { - case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: - qc->getQueue()->createComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: - qc->getQueue()->flushComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: - qc->getQueue()->destroyComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - qc->getQueue()->enqueueComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - qc->getQueue()->dequeueComplete(qc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } -} - -const qpid::broker::QueueHandle& -MockPersistableQueue::getHandle() const -{ - return m_queueHandle; -} - -qpid::broker::QueueHandle& -MockPersistableQueue::getHandle() -{ - return m_queueHandle; -} - -qpid::asyncStore::AsyncStoreImpl* -MockPersistableQueue::getStore() -{ - return m_store; -} - -void -MockPersistableQueue::asyncCreate() -{ - if (m_store) { - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - qpid::asyncStore::AsyncOperation::QUEUE_CREATE, - &handleAsyncResult, - &m_resultQueue)); - m_store->submitCreate(m_queueHandle, - this, - qac); - ++m_asyncOpCounter; - } -} - -void -MockPersistableQueue::asyncDestroy(const bool deleteQueue) -{ - m_destroyPending = true; - if (m_store) { - if (deleteQueue) { - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, - &handleAsyncResult, - &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, - qac); - ++m_asyncOpCounter; - } - m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); - } -} - -void -MockPersistableQueue::deliver(boost::shared_ptr msg) -{ - QueuedMessage qm(this, msg); - if(enqueue((MockTransactionContext*)0, qm)) { - push(qm); - } -} - -bool -MockPersistableQueue::dispatch() -{ - QueuedMessage qm; - if (m_messages->consume(qm)) { - return dequeue((MockTransactionContext*)0, qm); - } - return false; -} - -bool -MockPersistableQueue::enqueue(MockTransactionContext* ctxt, - QueuedMessage& qm) -{ - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(ctxt, qm); - } - return false; -} - -bool -MockPersistableQueue::dequeue(MockTransactionContext* ctxt, - QueuedMessage& qm) -{ - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(ctxt, qm); - } - return false; -} - -void -MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putShortString(m_name); -} - -uint32_t -MockPersistableQueue::encodedSize() const -{ - return m_name.size() + 1; -} - -uint64_t -MockPersistableQueue::getPersistenceId() const -{ - return m_persistenceId; -} - -void -MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const -{ - m_persistenceId = persistenceId; -} - -void -MockPersistableQueue::flush() -{ - //if(m_store) m_store->flush(*this); -} - -const std::string& -MockPersistableQueue::getName() const -{ - return m_name; -} - -void -MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) -{ - if (externalQueueStore != inst && externalQueueStore) - delete externalQueueStore; - externalQueueStore = inst; -} - -uint64_t -MockPersistableQueue::getSize() -{ - return m_persistableData.size(); -} - -void -MockPersistableQueue::write(char* target) -{ - ::memcpy(target, m_persistableData.data(), m_persistableData.size()); -} - -// --- Members & methods in msg handling path from qpid::Queue --- - -// protected -MockPersistableQueue::UsageBarrier::UsageBarrier(MockPersistableQueue& q) : - m_parent(q), - m_count(0) -{} - -// protected -bool -MockPersistableQueue::UsageBarrier::acquire() -{ - qpid::sys::Monitor::ScopedLock l(m_monitor); - if (m_parent.m_destroyed) { - return false; - } else { - ++m_count; - return true; - } -} - -// protected -void MockPersistableQueue::UsageBarrier::release() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - if (--m_count == 0) { - m_monitor.notifyAll(); - } -} - -// protected -void MockPersistableQueue::UsageBarrier::destroy() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - m_parent.m_destroyed = true; - while (m_count) { - m_monitor.wait(); - } -} - -// protected -MockPersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) : - m_barrier(b), - m_acquired(m_barrier.acquire()) -{} - -// protected -MockPersistableQueue::ScopedUse::~ScopedUse() -{ - if (m_acquired) { - m_barrier.release(); - } -} - -// private -void -MockPersistableQueue::push(QueuedMessage& qm, - bool /*isRecovery*/) -{ - QueuedMessage removed; - m_messages->push(qm, removed); -} - -// --- End Members & methods in msg handling path from qpid::Queue --- - -// private -bool -MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn, - QueuedMessage& qm) -{ - qm.payload()->setPersistenceId(m_store->getNextRid()); -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - &handleAsyncResult, - &m_resultQueue)); - m_store->submitEnqueue(qm.enqHandle(), - txn->getHandle(), - qac); - ++m_asyncOpCounter; - return true; -} - -// private -bool -MockPersistableQueue::asyncDequeue(MockTransactionContext* txn, - QueuedMessage& qm) -{ -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - &handleAsyncResult, - &m_resultQueue)); - m_store->submitDequeue(qm.enqHandle(), - txn->getHandle(), - qac); - ++m_asyncOpCounter; - return true; -} - -// private -void -MockPersistableQueue::destroyCheck(const std::string& opDescr) const -{ - if (m_destroyPending || m_destroyed) { - std::ostringstream oss; - oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; - throw qpid::Exception(oss.str()); - } -} - -// private -void -MockPersistableQueue::createComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; -} - -// private -void -MockPersistableQueue::flushComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; -} - -// private -void -MockPersistableQueue::destroyComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - m_destroyed = true; -} - -// private -void -MockPersistableQueue::enqueueComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; -} - -// private -void -MockPersistableQueue::dequeueComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h deleted file mode 100644 index 860d98b867..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockPersistableQueue.h - */ - -#ifndef tests_storePerftools_asyncPerf_MockPersistableQueue_h_ -#define tests_storePerftools_asyncPerf_MockPersistableQueue_h_ - -#include "AtomicCounter.h" // AsyncOpCounter - -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/QueueHandle.h" -#include "qpid/sys/Monitor.h" - -#include -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} -namespace broker { -class AsyncResultQueue; -} -namespace framing { -class FieldTable; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class Messages; -class MockPersistableMessage; -class MockPersistableQueue; -class MockTransactionContext; -class QueueAsyncContext; -class QueuedMessage; - -class MockPersistableQueue : public boost::enable_shared_from_this, - public qpid::broker::PersistableQueue, - public qpid::broker::DataSource -{ -public: - MockPersistableQueue(const std::string& name, - const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& arq); - virtual ~MockPersistableQueue(); - - static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); - const qpid::broker::QueueHandle& getHandle() const; - qpid::broker::QueueHandle& getHandle(); - qpid::asyncStore::AsyncStoreImpl* getStore(); - - void asyncCreate(); - void asyncDestroy(const bool deleteQueue); - - // --- Methods in msg handling path from qpid::Queue --- - void deliver(boost::shared_ptr msg); - bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param - bool enqueue(MockTransactionContext* ctxt, - QueuedMessage& qm); - bool dequeue(MockTransactionContext* ctxt, - QueuedMessage& qm); - - // --- Interface qpid::broker::Persistable --- - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - virtual uint64_t getPersistenceId() const; - virtual void setPersistenceId(uint64_t persistenceId) const; - - // --- Interface qpid::broker::PersistableQueue --- - virtual void flush(); - virtual const std::string& getName() const; - virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - - // --- Interface DataStore --- - virtual uint64_t getSize(); - virtual void write(char* target); - -private: - const std::string m_name; - qpid::asyncStore::AsyncStoreImpl* m_store; - qpid::broker::AsyncResultQueue& m_resultQueue; - AsyncOpCounter m_asyncOpCounter; - mutable uint64_t m_persistenceId; - std::string m_persistableData; - qpid::broker::QueueHandle m_queueHandle; - bool m_destroyPending; - bool m_destroyed; - - // --- Members & methods in msg handling path copied from qpid::Queue --- - struct UsageBarrier - { - MockPersistableQueue& m_parent; - uint32_t m_count; - qpid::sys::Monitor m_monitor; - UsageBarrier(MockPersistableQueue& q); - bool acquire(); - void release(); - void destroy(); - }; - struct ScopedUse - { - UsageBarrier& m_barrier; - const bool m_acquired; - ScopedUse(UsageBarrier& b); - ~ScopedUse(); - }; - UsageBarrier m_barrier; - std::auto_ptr m_messages; - void push(QueuedMessage& qm, - bool isRecovery = false); - - // -- Async ops --- - bool asyncEnqueue(MockTransactionContext* txn, - QueuedMessage& qm); - bool asyncDequeue(MockTransactionContext* txn, - QueuedMessage& qm); - - // --- Async op counter --- - void destroyCheck(const std::string& opDescr) const; - - // --- Async op completions (called through handleAsyncResult) --- - void createComplete(const boost::shared_ptr qc); - void flushComplete(const boost::shared_ptr qc); - void destroyComplete(const boost::shared_ptr qc); - void enqueueComplete(const boost::shared_ptr qc); - void dequeueComplete(const boost::shared_ptr qc); -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_MockPersistableQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp deleted file mode 100644 index 7e0db4a154..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockTransactionContext.cpp - */ - -#include "MockTransactionContext.h" - -#include "TransactionAsyncContext.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MockTransactionContext::MockTransactionContext(const std::string& xid) : - qpid::broker::TransactionContext(), - m_xid(xid), - m_tpcFlag(!xid.empty()), - m_store(0), - m_txnHandle(0), - m_prepared(false), - m_enqueuedMsgs() -{ - if (!m_tpcFlag) { - setLocalXid(); - } -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; -} - -MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid) : - m_store(store), - m_prepared(false), - m_enqueuedMsgs() -{ -//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; - if (m_store != 0) { - m_txnHandle = store->createTxnHandle(xid); - } -} - -MockTransactionContext::~MockTransactionContext() -{} - -// static -/* -void -MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc) -{ - if (bc && res) { - TransactionAsyncContext* tac = dynamic_cast(bc); - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Transaction xid=\"" << tac->getTransactionContext()->getXid() << "\": Operation " << tac->getOpStr() << ": failure " - << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(tac->getOpCode()) { - case qpid::asyncStore::AsyncOperation::TXN_PREPARE: - tac->getTransactionContext()->prepareComplete(tac); - break; - case qpid::asyncStore::AsyncOperation::TXN_COMMIT: - tac->getTransactionContext()->commitComplete(tac); - break; - case qpid::asyncStore::AsyncOperation::TXN_ABORT: - tac->getTransactionContext()->abortComplete(tac); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tac->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } - if (bc) delete bc; - if (res) delete res; -} -*/ - -const qpid::broker::TxnHandle& -MockTransactionContext::getHandle() const -{ - return m_txnHandle; -} - -qpid::broker::TxnHandle& -MockTransactionContext::getHandle() -{ - return m_txnHandle; -} - -bool -MockTransactionContext::is2pc() const -{ - return m_tpcFlag; -} - -const std::string& -MockTransactionContext::getXid() const -{ - return m_xid; -} - -void -MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) -{ - qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); -} - -void -MockTransactionContext::prepare() -{ - if (m_tpcFlag) { - localPrepare(); - m_prepared = true; - } - std::ostringstream oss; - oss << "MockTransactionContext::prepare(): xid=\"" << getXid() - << "\": Transaction Error: called prepare() on local transaction"; - throw qpid::Exception(oss.str()); -} - -void -MockTransactionContext::abort() -{ - // TODO: Check the following XA transaction semantics: - // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. - if (!m_prepared) { - localPrepare(); - } - if (m_store != 0) { -// m_store->submitAbort(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); - } -//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -void -MockTransactionContext::commit() -{ - if (is2pc()) { - if (!m_prepared) { - std::ostringstream oss; - oss << "MockTransactionContext::abort(): xid=\"" << getXid() - << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; - throw qpid::Exception(oss.str()); - } - } else { - localPrepare(); - } - if (m_store != 0) { -// m_store->submitCommit(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); - } -//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - - -// private -void -MockTransactionContext::localPrepare() -{ - if (m_store != 0) { -// m_store->submitPrepare(m_txnHandle, -// &handleAsyncResult, -// dynamic_cast(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); - } -//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; -} - -// private -void -MockTransactionContext::setLocalXid() -{ - uuid_t uuid; - // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter? - // If this race condition affects the randomness of the UUID, then there could be a problem here. - ::uuid_generate_random(uuid); - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); - m_xid.assign(uuidStr); -} - -// private -void -MockTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/) -{ - qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); -// while (!m_enqueuedMsgs.empty()) { -// m_enqueuedMsgs.front()->clearTransaction(); -// m_enqueuedMsgs.pop_front(); -// } -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; -// assert(tc->getTransactionContext().get() == this); -} - - -// private -void -MockTransactionContext::abortComplete(const TransactionAsyncContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext().get() == this); -} - - -// private -void -MockTransactionContext::commitComplete(const TransactionAsyncContext* tc) -{ -//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext().get() == this); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h deleted file mode 100644 index c57a37e084..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MockTransactionContext.h - */ - -#ifndef tests_storePerftools_asyncPerf_MockTransactionContext_h_ -#define tests_storePerftools_asyncPerf_MockTransactionContext_h_ - -#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext -#include "qpid/broker/TxnHandle.h" -#include "qpid/sys/Mutex.h" - -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} -namespace broker { -class AsyncResult; -class BrokerAsyncContext; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; -class TransactionAsyncContext; - -class MockTransactionContext : public qpid::broker::TransactionContext -{ -public: - MockTransactionContext(const std::string& xid = std::string()); - MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, - const std::string& xid = std::string()); - virtual ~MockTransactionContext(); -// static void handleAsyncResult(const qpid::broker::AsyncResult* res, -// qpid::broker::BrokerAsyncContext* bc); - - const qpid::broker::TxnHandle& getHandle() const; - qpid::broker::TxnHandle& getHandle(); - bool is2pc() const; - const std::string& getXid() const; - void addEnqueuedMsg(QueuedMessage* qm); - - void prepare(); - void abort(); - void commit(); - -private: - std::string m_xid; - bool m_tpcFlag; - qpid::asyncStore::AsyncStoreImpl* m_store; - qpid::broker::TxnHandle m_txnHandle; - bool m_prepared; - std::deque m_enqueuedMsgs; - qpid::sys::Mutex m_enqueuedMsgsMutex; - - void localPrepare(); - void setLocalXid(); - - // --- Ascnc op completions (called through handleAsyncResult) --- - void prepareComplete(const TransactionAsyncContext* tc); - void abortComplete(const TransactionAsyncContext* tc); - void commitComplete(const TransactionAsyncContext* tc); - -}; - -}}} // namespace tests:storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_MockTransactionContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 0c91b29a1e..fbfebbac2b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -25,7 +25,7 @@ #include "MessageConsumer.h" #include "MessageProducer.h" -#include "MockPersistableQueue.h" +#include "SimplePersistableQueue.h" #include "tests/storePerftools/version.h" #include "tests/storePerftools/common/ScopedTimer.h" @@ -138,7 +138,7 @@ PerfTest::prepareQueues() for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); + boost::shared_ptr mpq(new SimplePersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 8957d05792..e42db090d2 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -48,7 +48,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockPersistableQueue; +class SimplePersistableQueue; class MessageConsumer; class MessageProducer; class TestOptions; @@ -72,7 +72,7 @@ private: qpid::sys::Thread m_pollingThread; qpid::broker::AsyncResultQueueImpl m_resultQueue; qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque > m_queueList; + std::deque > m_queueList; std::deque > m_producers; std::deque > m_consumers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index 513175ab41..a389f86739 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -29,7 +29,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : @@ -41,8 +41,8 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, assert(m_q.get() != 0); } -QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, - boost::shared_ptr msg, +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, + boost::shared_ptr msg, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : @@ -71,13 +71,13 @@ QueueAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::shared_ptr +boost::shared_ptr QueueAsyncContext::getQueue() const { return m_q; } -boost::shared_ptr +boost::shared_ptr QueueAsyncContext::getMessage() const { return m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index b7a2205307..982aef3874 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -35,34 +35,34 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockPersistableMessage; -class MockPersistableQueue; +class SimplePersistableMessage; +class SimplePersistableQueue; class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: - QueueAsyncContext(boost::shared_ptr q, + QueueAsyncContext(boost::shared_ptr q, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); - QueueAsyncContext(boost::shared_ptr q, - boost::shared_ptr msg, + QueueAsyncContext(boost::shared_ptr q, + boost::shared_ptr msg, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::shared_ptr getQueue() const; - boost::shared_ptr getMessage() const; + boost::shared_ptr getQueue() const; + boost::shared_ptr getMessage() const; qpid::broker::AsyncResultQueue* getAsyncResultQueue() const; qpid::broker::AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const; void destroy(); private: - boost::shared_ptr m_q; - boost::shared_ptr m_msg; + boost::shared_ptr m_q; + boost::shared_ptr m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; qpid::broker::AsyncResultCallback m_rcb; qpid::broker::AsyncResultQueue* const m_arq; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 802279bbf9..8530051eec 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -23,11 +23,10 @@ #include "QueuedMessage.h" -#include "MockPersistableMessage.h" -#include "MockPersistableQueue.h" +#include "SimplePersistableMessage.h" +#include "SimplePersistableQueue.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -//#include "qpid/broker/EnqueueHandle.h" namespace tests { namespace storePerftools { @@ -37,8 +36,8 @@ QueuedMessage::QueuedMessage() : m_queue(0) {} -QueuedMessage::QueuedMessage(MockPersistableQueue* q, - boost::shared_ptr msg) : +QueuedMessage::QueuedMessage(SimplePersistableQueue* q, + boost::shared_ptr msg) : m_queue(q), m_msg(msg), m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) @@ -62,7 +61,7 @@ QueuedMessage::operator=(const QueuedMessage& rhs) return *this; } -boost::shared_ptr +boost::shared_ptr QueuedMessage::payload() const { return m_msg; @@ -80,51 +79,4 @@ QueuedMessage::enqHandle() return m_enqHandle; } -/* -QueuedMessage::QueuedMessage(boost::shared_ptr msg, - qpid::broker::EnqueueHandle& enqHandle, - boost::shared_ptr txn) : - m_msg(msg), - m_enqHandle(enqHandle), - m_txn(txn) -{ - if (txn.get()) { - txn->addEnqueuedMsg(this); - } -} - -QueuedMessage::~QueuedMessage() -{} - -boost::shared_ptr -QueuedMessage::getMessage() const -{ - return m_msg; -} - -qpid::broker::EnqueueHandle -QueuedMessage::getEnqueueHandle() const -{ - return m_enqHandle; -} - -boost::shared_ptr -QueuedMessage::getTransactionContext() const -{ - return m_txn; -} - -bool -QueuedMessage::isTransactional() const -{ - return m_txn.get() != 0; -} - -void -QueuedMessage::clearTransaction() -{ - m_txn.reset(static_cast(0)); -} -*/ - }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index f7b44e046b..b1e8a7226a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -32,25 +32,25 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockPersistableMessage; -class MockPersistableQueue; +class SimplePersistableMessage; +class SimplePersistableQueue; class QueuedMessage { public: QueuedMessage(); - QueuedMessage(MockPersistableQueue* q, - boost::shared_ptr msg); + QueuedMessage(SimplePersistableQueue* q, + boost::shared_ptr msg); QueuedMessage(const QueuedMessage& qm); ~QueuedMessage(); QueuedMessage& operator=(const QueuedMessage& rhs); - boost::shared_ptr payload() const; + boost::shared_ptr payload() const; const qpid::broker::EnqueueHandle& enqHandle() const; qpid::broker::EnqueueHandle& enqHandle(); private: - MockPersistableQueue* m_queue; - boost::shared_ptr m_msg; + SimplePersistableQueue* m_queue; + boost::shared_ptr m_msg; qpid::broker::EnqueueHandle m_enqHandle; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp new file mode 100644 index 0000000000..58aacabdcd --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimplePersistableMessage.cpp + */ + +#include "SimplePersistableMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +SimplePersistableMessage::SimplePersistableMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) +{} + +SimplePersistableMessage::~SimplePersistableMessage() +{} + +const qpid::broker::MessageHandle& +SimplePersistableMessage::getHandle() const +{ + return m_msgHandle; +} + +qpid::broker::MessageHandle& +SimplePersistableMessage::getHandle() +{ + return m_msgHandle; +} + +void +SimplePersistableMessage::setPersistenceId(uint64_t id) const +{ + m_persistenceId = id; +} + +uint64_t +SimplePersistableMessage::getPersistenceId() const +{ + return m_persistenceId; +} + +void +SimplePersistableMessage::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putRawData(m_msg); +} + +uint32_t +SimplePersistableMessage::encodedSize() const +{ + return static_cast(m_msg.size()); +} + +void +SimplePersistableMessage::allDequeuesComplete() +{} + +uint32_t +SimplePersistableMessage::encodedHeaderSize() const +{ + return 0; +} + +bool +SimplePersistableMessage::isPersistent() const +{ + return m_msgHandle.isValid(); +} + +uint64_t +SimplePersistableMessage::getSize() +{ + return m_msg.size(); +} + +void +SimplePersistableMessage::write(char* target) +{ + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h new file mode 100644 index 0000000000..f6258822ea --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimplePersistableMessage.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ +#define tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ + +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/MessageHandle.h" +#include "qpid/broker/PersistableMessage.h" + +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class SimplePersistableQueue; + +class SimplePersistableMessage: public qpid::broker::PersistableMessage, + public qpid::broker::DataSource +{ +public: + SimplePersistableMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store); + virtual ~SimplePersistableMessage(); + const qpid::broker::MessageHandle& getHandle() const; + qpid::broker::MessageHandle& getHandle(); + + // Interface Persistable + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // Interface PersistableMessage + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // Interface DataStore + virtual uint64_t getSize(); + virtual void write(char* target); + +private: + mutable uint64_t m_persistenceId; + const std::string m_msg; + qpid::broker::MessageHandle m_msgHandle; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp new file mode 100644 index 0000000000..198c43b087 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimplePersistableQueue.cpp + */ + +#include "SimplePersistableQueue.h" + +#include "MessageDeque.h" +#include "SimplePersistableMessage.h" +#include "SimpleTransactionContext.h" +#include "QueueAsyncContext.h" +#include "QueuedMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/AsyncResultHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +SimplePersistableQueue::SimplePersistableQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq) : + qpid::broker::PersistableQueue(), + m_name(name), + m_store(store), + m_resultQueue(arq), + m_asyncOpCounter(0UL), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new MessageDeque()) +{ + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } +} + +SimplePersistableQueue::~SimplePersistableQueue() +{ +// m_store->flush(*this); + // TODO: Make destroying the store a test parameter +// m_store->destroy(*this); +// m_store = 0; +} + +// static +void +SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) +{ + if (arh) { + boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { +//std::cout << "QQQ SimplePersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; + // Handle async success here + switch(qc->getOpCode()) { + case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: + qc->getQueue()->createComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: + qc->getQueue()->flushComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: + qc->getQueue()->destroyComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + qc->getQueue()->enqueueComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + qc->getQueue()->dequeueComplete(qc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::SimplePersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); + throw qpid::Exception(oss.str()); + }; + } + } +} + +const qpid::broker::QueueHandle& +SimplePersistableQueue::getHandle() const +{ + return m_queueHandle; +} + +qpid::broker::QueueHandle& +SimplePersistableQueue::getHandle() +{ + return m_queueHandle; +} + +qpid::asyncStore::AsyncStoreImpl* +SimplePersistableQueue::getStore() +{ + return m_store; +} + +void +SimplePersistableQueue::asyncCreate() +{ + if (m_store) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_CREATE, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitCreate(m_queueHandle, + this, + qac); + ++m_asyncOpCounter; + } +} + +void +SimplePersistableQueue::asyncDestroy(const bool deleteQueue) +{ + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitDestroy(m_queueHandle, + qac); + ++m_asyncOpCounter; + } + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); + } +} + +void +SimplePersistableQueue::deliver(boost::shared_ptr msg) +{ + QueuedMessage qm(this, msg); + if(enqueue((SimpleTransactionContext*)0, qm)) { + push(qm); + } +} + +bool +SimplePersistableQueue::dispatch() +{ + QueuedMessage qm; + if (m_messages->consume(qm)) { + return dequeue((SimpleTransactionContext*)0, qm); + } + return false; +} + +bool +SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, + QueuedMessage& qm) +{ + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(ctxt, qm); + } + return false; +} + +bool +SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, + QueuedMessage& qm) +{ + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(ctxt, qm); + } + return false; +} + +void +SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putShortString(m_name); +} + +uint32_t +SimplePersistableQueue::encodedSize() const +{ + return m_name.size() + 1; +} + +uint64_t +SimplePersistableQueue::getPersistenceId() const +{ + return m_persistenceId; +} + +void +SimplePersistableQueue::setPersistenceId(uint64_t persistenceId) const +{ + m_persistenceId = persistenceId; +} + +void +SimplePersistableQueue::flush() +{ + //if(m_store) m_store->flush(*this); +} + +const std::string& +SimplePersistableQueue::getName() const +{ + return m_name; +} + +void +SimplePersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) +{ + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +SimplePersistableQueue::getSize() +{ + return m_persistableData.size(); +} + +void +SimplePersistableQueue::write(char* target) +{ + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +SimplePersistableQueue::UsageBarrier::UsageBarrier(SimplePersistableQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +SimplePersistableQueue::UsageBarrier::acquire() +{ + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void SimplePersistableQueue::UsageBarrier::release() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void SimplePersistableQueue::UsageBarrier::destroy() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +SimplePersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +SimplePersistableQueue::ScopedUse::~ScopedUse() +{ + if (m_acquired) { + m_barrier.release(); + } +} + +// private +void +SimplePersistableQueue::push(QueuedMessage& qm, + bool /*isRecovery*/) +{ + QueuedMessage removed; + m_messages->push(qm, removed); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// private +bool +SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, + QueuedMessage& qm) +{ + qm.payload()->setPersistenceId(m_store->getNextRid()); +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitEnqueue(qm.enqHandle(), + txn->getHandle(), + qac); + ++m_asyncOpCounter; + return true; +} + +// private +bool +SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn, + QueuedMessage& qm) +{ +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitDequeue(qm.enqHandle(), + txn->getHandle(), + qac); + ++m_asyncOpCounter; + return true; +} + +// private +void +SimplePersistableQueue::destroyCheck(const std::string& opDescr) const +{ + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + +// private +void +SimplePersistableQueue::createComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; +} + +// private +void +SimplePersistableQueue::flushComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; +} + +// private +void +SimplePersistableQueue::destroyComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; + m_destroyed = true; +} + +// private +void +SimplePersistableQueue::enqueueComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; +} + +// private +void +SimplePersistableQueue::dequeueComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h new file mode 100644 index 0000000000..1366c7772b --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimplePersistableQueue.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ +#define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ + +#include "AtomicCounter.h" // AsyncOpCounter + +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Monitor.h" + +#include +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace broker { +class AsyncResultQueue; +} +namespace framing { +class FieldTable; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class Messages; +class SimplePersistableMessage; +class SimpleTransactionContext; +class QueueAsyncContext; +class QueuedMessage; + +class SimplePersistableQueue : public boost::enable_shared_from_this, + public qpid::broker::PersistableQueue, + public qpid::broker::DataSource +{ +public: + SimplePersistableQueue(const std::string& name, + const qpid::framing::FieldTable& args, + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq); + virtual ~SimplePersistableQueue(); + + static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); + const qpid::broker::QueueHandle& getHandle() const; + qpid::broker::QueueHandle& getHandle(); + qpid::asyncStore::AsyncStoreImpl* getStore(); + + void asyncCreate(); + void asyncDestroy(const bool deleteQueue); + + // --- Methods in msg handling path from qpid::Queue --- + void deliver(boost::shared_ptr msg); + bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param + bool enqueue(SimpleTransactionContext* ctxt, + QueuedMessage& qm); + bool dequeue(SimpleTransactionContext* ctxt, + QueuedMessage& qm); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); + + // --- Interface DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +private: + const std::string m_name; + qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; + AsyncOpCounter m_asyncOpCounter; + mutable uint64_t m_persistenceId; + std::string m_persistableData; + qpid::broker::QueueHandle m_queueHandle; + bool m_destroyPending; + bool m_destroyed; + + // --- Members & methods in msg handling path copied from qpid::Queue --- + struct UsageBarrier + { + SimplePersistableQueue& m_parent; + uint32_t m_count; + qpid::sys::Monitor m_monitor; + UsageBarrier(SimplePersistableQueue& q); + bool acquire(); + void release(); + void destroy(); + }; + struct ScopedUse + { + UsageBarrier& m_barrier; + const bool m_acquired; + ScopedUse(UsageBarrier& b); + ~ScopedUse(); + }; + UsageBarrier m_barrier; + std::auto_ptr m_messages; + void push(QueuedMessage& qm, + bool isRecovery = false); + + // -- Async ops --- + bool asyncEnqueue(SimpleTransactionContext* txn, + QueuedMessage& qm); + bool asyncDequeue(SimpleTransactionContext* txn, + QueuedMessage& qm); + + // --- Async op counter --- + void destroyCheck(const std::string& opDescr) const; + + // --- Async op completions (called through handleAsyncResult) --- + void createComplete(const boost::shared_ptr qc); + void flushComplete(const boost::shared_ptr qc); + void destroyComplete(const boost::shared_ptr qc); + void enqueueComplete(const boost::shared_ptr qc); + void dequeueComplete(const boost::shared_ptr qc); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp new file mode 100644 index 0000000000..2aea14bc21 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.cpp @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTransactionContext.cpp + */ + +#include "SimpleTransactionContext.h" + +#include "TransactionAsyncContext.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +#include + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +SimpleTransactionContext::SimpleTransactionContext(const std::string& xid) : + qpid::broker::TransactionContext(), + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_store(0), + m_txnHandle(0), + m_prepared(false), + m_enqueuedMsgs() +{ + if (!m_tpcFlag) { + setLocalXid(); + } +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + +SimpleTransactionContext::SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, + const std::string& xid) : + m_store(store), + m_prepared(false), + m_enqueuedMsgs() +{ +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; + if (m_store != 0) { + m_txnHandle = store->createTxnHandle(xid); + } +} + +SimpleTransactionContext::~SimpleTransactionContext() +{} + +// static +/* +void +SimpleTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerAsyncContext* bc) +{ + if (bc && res) { + TransactionAsyncContext* tac = dynamic_cast(bc); + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Transaction xid=\"" << tac->getTransactionContext()->getXid() << "\": Operation " << tac->getOpStr() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(tac->getOpCode()) { + case qpid::asyncStore::AsyncOperation::TXN_PREPARE: + tac->getTransactionContext()->prepareComplete(tac); + break; + case qpid::asyncStore::AsyncOperation::TXN_COMMIT: + tac->getTransactionContext()->commitComplete(tac); + break; + case qpid::asyncStore::AsyncOperation::TXN_ABORT: + tac->getTransactionContext()->abortComplete(tac); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::SimpleTransactionContext::handleAsyncResult(): Unknown async operation: " << tac->getOpCode(); + throw qpid::Exception(oss.str()); + }; + } + } + if (bc) delete bc; + if (res) delete res; +} +*/ + +const qpid::broker::TxnHandle& +SimpleTransactionContext::getHandle() const +{ + return m_txnHandle; +} + +qpid::broker::TxnHandle& +SimpleTransactionContext::getHandle() +{ + return m_txnHandle; +} + +bool +SimpleTransactionContext::is2pc() const +{ + return m_tpcFlag; +} + +const std::string& +SimpleTransactionContext::getXid() const +{ + return m_xid; +} + +void +SimpleTransactionContext::addEnqueuedMsg(QueuedMessage* qm) +{ + qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); +} + +void +SimpleTransactionContext::prepare() +{ + if (m_tpcFlag) { + localPrepare(); + m_prepared = true; + } + std::ostringstream oss; + oss << "SimpleTransactionContext::prepare(): xid=\"" << getXid() + << "\": Transaction Error: called prepare() on local transaction"; + throw qpid::Exception(oss.str()); +} + +void +SimpleTransactionContext::abort() +{ + // TODO: Check the following XA transaction semantics: + // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. + if (!m_prepared) { + localPrepare(); + } + if (m_store != 0) { +// m_store->submitAbort(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); + } +//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +void +SimpleTransactionContext::commit() +{ + if (is2pc()) { + if (!m_prepared) { + std::ostringstream oss; + oss << "SimpleTransactionContext::abort(): xid=\"" << getXid() + << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; + throw qpid::Exception(oss.str()); + } + } else { + localPrepare(); + } + if (m_store != 0) { +// m_store->submitCommit(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); + } +//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + + +// private +void +SimpleTransactionContext::localPrepare() +{ + if (m_store != 0) { +// m_store->submitPrepare(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); + } +//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +// private +void +SimpleTransactionContext::setLocalXid() +{ + uuid_t uuid; + // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter? + // If this race condition affects the randomness of the UUID, then there could be a problem here. + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); +} + +// private +void +SimpleTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/) +{ + qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); +// while (!m_enqueuedMsgs.empty()) { +// m_enqueuedMsgs.front()->clearTransaction(); +// m_enqueuedMsgs.pop_front(); +// } +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; +// assert(tc->getTransactionContext().get() == this); +} + + +// private +void +SimpleTransactionContext::abortComplete(const TransactionAsyncContext* tc) +{ +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; + assert(tc->getTransactionContext().get() == this); +} + + +// private +void +SimpleTransactionContext::commitComplete(const TransactionAsyncContext* tc) +{ +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; + assert(tc->getTransactionContext().get() == this); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h new file mode 100644 index 0000000000..49fee9c720 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleTransactionContext.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTransactionContext.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ +#define tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ + +#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext +#include "qpid/broker/TxnHandle.h" +#include "qpid/sys/Mutex.h" + +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace broker { +class AsyncResult; +class BrokerAsyncContext; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; +class TransactionAsyncContext; + +class SimpleTransactionContext : public qpid::broker::TransactionContext +{ +public: + SimpleTransactionContext(const std::string& xid = std::string()); + SimpleTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, + const std::string& xid = std::string()); + virtual ~SimpleTransactionContext(); +// static void handleAsyncResult(const qpid::broker::AsyncResult* res, +// qpid::broker::BrokerAsyncContext* bc); + + const qpid::broker::TxnHandle& getHandle() const; + qpid::broker::TxnHandle& getHandle(); + bool is2pc() const; + const std::string& getXid() const; + void addEnqueuedMsg(QueuedMessage* qm); + + void prepare(); + void abort(); + void commit(); + +private: + std::string m_xid; + bool m_tpcFlag; + qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::TxnHandle m_txnHandle; + bool m_prepared; + std::deque m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + + void localPrepare(); + void setLocalXid(); + + // --- Ascnc op completions (called through handleAsyncResult) --- + void prepareComplete(const TransactionAsyncContext* tc); + void abortComplete(const TransactionAsyncContext* tc); + void commitComplete(const TransactionAsyncContext* tc); + +}; + +}}} // namespace tests:storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_SimpleTransactionContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp index 51a5791403..f150a34027 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp @@ -29,7 +29,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr tc, +TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr tc, const qpid::asyncStore::AsyncOperation::opCode op): m_tc(tc), m_op(op) @@ -52,7 +52,7 @@ TransactionAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::shared_ptr +boost::shared_ptr TransactionAsyncContext::getTransactionContext() const { return m_tc; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h index b1e74d6844..186a8141cf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h @@ -33,21 +33,21 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MockTransactionContext; +class SimpleTransactionContext; class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext { public: - TransactionAsyncContext(boost::shared_ptr tc, + TransactionAsyncContext(boost::shared_ptr tc, const qpid::asyncStore::AsyncOperation::opCode op); virtual ~TransactionAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::shared_ptr getTransactionContext() const; + boost::shared_ptr getTransactionContext() const; void destroy(); private: - boost::shared_ptr m_tc; + boost::shared_ptr m_tc; const qpid::asyncStore::AsyncOperation::opCode m_op; }; -- cgit v1.2.1