From b95f9427ede4a2045ac6424a6341de9185a13602 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 21 Jun 2012 12:09:00 +0000 Subject: QPID-3858: WIP: New classes for transactional consumption of messages git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1352509 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/TxnOp.h | 2 + cpp/src/tests/CMakeLists.txt | 6 +- .../tests/storePerftools/asyncPerf/Deliverable.h | 8 +- .../storePerftools/asyncPerf/DeliveryRecord.cpp | 82 ++++ .../storePerftools/asyncPerf/DeliveryRecord.h | 56 +++ .../asyncPerf/MessageAsyncContext.cpp | 10 +- .../storePerftools/asyncPerf/MessageAsyncContext.h | 16 +- .../storePerftools/asyncPerf/MessageConsumer.cpp | 27 +- .../storePerftools/asyncPerf/MessageConsumer.h | 18 +- .../storePerftools/asyncPerf/MessageProducer.cpp | 8 +- .../storePerftools/asyncPerf/MessageProducer.h | 6 +- .../tests/storePerftools/asyncPerf/PerfTest.cpp | 6 +- cpp/src/tests/storePerftools/asyncPerf/PerfTest.h | 4 +- .../storePerftools/asyncPerf/QueueAsyncContext.cpp | 12 +- .../storePerftools/asyncPerf/QueueAsyncContext.h | 18 +- .../storePerftools/asyncPerf/QueuedMessage.cpp | 16 +- .../tests/storePerftools/asyncPerf/QueuedMessage.h | 15 +- .../storePerftools/asyncPerf/SimpleMessage.cpp | 113 +++++ .../tests/storePerftools/asyncPerf/SimpleMessage.h | 79 ++++ .../asyncPerf/SimplePersistableMessage.cpp | 113 ----- .../asyncPerf/SimplePersistableMessage.h | 79 ---- .../asyncPerf/SimplePersistableQueue.cpp | 456 --------------------- .../asyncPerf/SimplePersistableQueue.h | 157 ------- .../tests/storePerftools/asyncPerf/SimpleQueue.cpp | 456 +++++++++++++++++++++ .../tests/storePerftools/asyncPerf/SimpleQueue.h | 157 +++++++ .../tests/storePerftools/asyncPerf/TxnAccept.cpp | 52 +++ cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h | 46 +++ .../tests/storePerftools/asyncPerf/TxnPublish.cpp | 10 +- .../tests/storePerftools/asyncPerf/TxnPublish.h | 14 +- 29 files changed, 1161 insertions(+), 881 deletions(-) create mode 100644 cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h create mode 100644 cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp create mode 100644 cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h index e98429535e..1626e30ccd 100644 --- a/cpp/src/qpid/broker/TxnOp.h +++ b/cpp/src/qpid/broker/TxnOp.h @@ -27,6 +27,8 @@ namespace qpid { namespace broker { +class TxnHandle; + class TxnOp{ public: virtual ~TxnOp() {} diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index fbe4cfbd7d..ad9a518867 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -380,6 +380,7 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES storePerftools/asyncPerf/Deliverable.cpp + storePerftools/asyncPerf/DeliveryRecord.cpp storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp storePerftools/asyncPerf/MessageDeque.cpp @@ -387,10 +388,11 @@ set (asyncStorePerf_SOURCES storePerftools/asyncPerf/PerfTest.cpp storePerftools/asyncPerf/QueueAsyncContext.cpp storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimplePersistableMessage.cpp - storePerftools/asyncPerf/SimplePersistableQueue.cpp + storePerftools/asyncPerf/SimpleMessage.cpp + storePerftools/asyncPerf/SimpleQueue.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp + storePerftools/asyncPerf/TxnAccept.cpp storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h index 57e130eeba..990d53a199 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h +++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h @@ -31,8 +31,8 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class Deliverable { @@ -41,8 +41,8 @@ public: virtual ~Deliverable(); virtual uint64_t contentSize() = 0; - virtual void deliverTo(const boost::shared_ptr& queue) = 0; - virtual SimplePersistableMessage& getMessage() = 0; + virtual void deliverTo(const boost::shared_ptr& queue) = 0; + virtual SimpleMessage& getMessage() = 0; virtual bool isDelivered() const; protected: diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp new file mode 100644 index 0000000000..7a0224a9b5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DeliveryRecord.cpp + */ + +#include "DeliveryRecord.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +DeliveryRecord::DeliveryRecord(const QueuedMessage& qm, + bool accepted) : + m_queuedMessage(qm), + m_accepted(accepted), + m_ended(accepted) +{} + +DeliveryRecord::~DeliveryRecord() +{} + +bool +DeliveryRecord::accept(qpid::broker::TxnHandle* txn) +{ + if (!m_ended) { + assert(m_queuedMessage.getQueue()); + m_queuedMessage.getQueue()->dequeue(*txn, m_queuedMessage); + m_accepted = true; + setEnded(); + } + return isRedundant(); +} + +bool +DeliveryRecord::isAccepted() const +{ + return m_accepted; +} + +bool +DeliveryRecord::setEnded() +{ + m_ended = true; + m_queuedMessage.payload() = boost::intrusive_ptr(0); + return isRedundant(); +} + +bool +DeliveryRecord::isEnded() const +{ + return m_ended; +} + +bool +DeliveryRecord::isRedundant() const +{ + return m_ended; +} + + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h new file mode 100644 index 0000000000..25b5446a5f --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DeliveryRecord.h + */ + +#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_ +#define tests_storePerftools_asyncPerf_DeliveryRecord_h_ + +#include "QueuedMessage.h" + +namespace qpid { +namespace broker { +class TxnHandle; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class DeliveryRecord { +public: + DeliveryRecord(const QueuedMessage& qm, + bool accepted); + virtual ~DeliveryRecord(); + bool accept(qpid::broker::TxnHandle* txn); + bool isAccepted() const; + bool setEnded(); + bool isEnded() const; + bool isRedundant() const; +private: + QueuedMessage m_queuedMessage; + bool m_accepted : 1; + bool m_ended : 1; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp index 5e161d49c8..abb6b5c657 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp @@ -22,7 +22,7 @@ */ #include "MessageAsyncContext.h" -#include "SimplePersistableMessage.h" +#include "SimpleMessage.h" #include @@ -30,9 +30,9 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr msg, +MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr msg, const qpid::asyncStore::AsyncOperation::opCode op, - boost::shared_ptr q) : + boost::shared_ptr q) : m_msg(msg), m_op(op), m_q(q) @@ -56,13 +56,13 @@ MessageAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::intrusive_ptr +boost::intrusive_ptr MessageAsyncContext::getMessage() const { return m_msg; } -boost::shared_ptr +boost::shared_ptr MessageAsyncContext::getQueue() const { return m_q; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h index f13cd4ab64..8418c4c760 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h @@ -34,26 +34,26 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class MessageAsyncContext : public qpid::broker::BrokerAsyncContext { public: - MessageAsyncContext(boost::intrusive_ptr msg, + MessageAsyncContext(boost::intrusive_ptr msg, const qpid::asyncStore::AsyncOperation::opCode op, - boost::shared_ptr q); + boost::shared_ptr q); virtual ~MessageAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::intrusive_ptr getMessage() const; - boost::shared_ptr getQueue() const; + boost::intrusive_ptr getMessage() const; + boost::shared_ptr getQueue() const; void destroy(); private: - boost::intrusive_ptr m_msg; + boost::intrusive_ptr m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; - boost::shared_ptr m_q; + boost::shared_ptr m_q; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 9b015fc428..1859bde947 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,9 +23,12 @@ #include "MessageConsumer.h" -#include "SimplePersistableQueue.h" +#include "SimpleQueue.h" #include "TestOptions.h" +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/TxnBuffer.h" + #include // uint32_t namespace tests { @@ -33,8 +36,12 @@ namespace storePerftools { namespace asyncPerf { MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - boost::shared_ptr queue) : + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, + boost::shared_ptr queue) : m_perfTestParams(perfTestParams), + m_store(store), + m_resultQueue(arq), m_queue(queue) {} @@ -44,6 +51,13 @@ MessageConsumer::~MessageConsumer() void* MessageConsumer::runConsumers() { + const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; + uint16_t txnCnt = 0U; + qpid::broker::TxnBuffer* tb = 0; + if (useTxns) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + uint32_t numMsgs = 0; while (numMsgs < m_perfTestParams.m_numMsgs) { if (m_queue->dispatch()) { @@ -52,6 +66,15 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } + + if (txnCnt) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + } else { + tb->commit(); + } + } + return reinterpret_cast(0); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index 7f5816b6a0..5404fe9f58 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -26,25 +26,37 @@ #include "boost/shared_ptr.hpp" +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace broker { +class AsyncResultQueue; +}} + namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; class TestOptions; class MessageConsumer { public: MessageConsumer(const TestOptions& perfTestParams, - boost::shared_ptr queue); + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, + boost::shared_ptr queue); virtual ~MessageConsumer(); void* runConsumers(); static void* startConsumers(void* ptr); private: const TestOptions& m_perfTestParams; - boost::shared_ptr m_queue; + qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; + boost::shared_ptr m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 008ddf33e7..0cab537fb0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -23,8 +23,8 @@ #include "MessageProducer.h" -#include "SimplePersistableMessage.h" -#include "SimplePersistableQueue.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" #include "TestOptions.h" #include "TxnPublish.h" @@ -41,7 +41,7 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr queue) : + boost::shared_ptr queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), @@ -62,7 +62,7 @@ MessageProducer::runProducers() tb = new qpid::broker::TxnBuffer(m_resultQueue); } for (uint32_t numMsgs=0; numMsgs msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + boost::intrusive_ptr msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); if (useTxns) { boost::shared_ptr op(new TxnPublish(msg)); op->deliverTo(m_queue); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 55504164ef..7fa74a2c51 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -38,7 +38,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; class TestOptions; class TxnBuffer; @@ -49,7 +49,7 @@ public: const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr queue); + boost::shared_ptr queue); virtual ~MessageProducer(); void* runProducers(); static void* startProducers(void* ptr); @@ -58,7 +58,7 @@ private: const char* m_msgData; qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr m_queue; + boost::shared_ptr m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 7941e761ca..e3fdd1c44d 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -25,7 +25,7 @@ #include "MessageConsumer.h" #include "MessageProducer.h" -#include "SimplePersistableQueue.h" +#include "SimpleQueue.h" #include "tests/storePerftools/version.h" #include "tests/storePerftools/common/ScopedTimer.h" @@ -87,7 +87,7 @@ PerfTest::run() reinterpret_cast(mp.get()))); threads.push_back(tp); } - boost::shared_ptr mc(new MessageConsumer(m_testOpts, m_queueList[q])); + boost::shared_ptr mc(new MessageConsumer(m_testOpts, m_store, m_resultQueue, m_queueList[q])); m_consumers.push_back(mc); for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads boost::shared_ptr tp(new tests::storePerftools::common::Thread(mc->startConsumers, @@ -138,7 +138,7 @@ PerfTest::prepareQueues() for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr mpq(new SimplePersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); + boost::shared_ptr mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index e42db090d2..6cdf015f76 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -48,7 +48,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; class MessageConsumer; class MessageProducer; class TestOptions; @@ -72,7 +72,7 @@ private: qpid::sys::Thread m_pollingThread; qpid::broker::AsyncResultQueueImpl m_resultQueue; qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque > m_queueList; + std::deque > m_queueList; std::deque > m_producers; std::deque > m_consumers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index 07a80c8a33..c1c657727b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -22,7 +22,7 @@ */ #include "QueueAsyncContext.h" -#include "SimplePersistableMessage.h" +#include "SimpleMessage.h" #include @@ -30,7 +30,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, @@ -44,8 +44,8 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q assert(m_q.get() != 0); } -QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, - boost::intrusive_ptr msg, +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, + boost::intrusive_ptr msg, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, @@ -76,13 +76,13 @@ QueueAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::shared_ptr +boost::shared_ptr QueueAsyncContext::getQueue() const { return m_q; } -boost::intrusive_ptr +boost::intrusive_ptr QueueAsyncContext::getMessage() const { return m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index b4d16fe615..112a5ab1dd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -36,19 +36,19 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: - QueueAsyncContext(boost::shared_ptr q, + QueueAsyncContext(boost::shared_ptr q, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); - QueueAsyncContext(boost::shared_ptr q, - boost::intrusive_ptr msg, + QueueAsyncContext(boost::shared_ptr q, + boost::intrusive_ptr msg, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, @@ -56,8 +56,8 @@ public: virtual ~QueueAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::shared_ptr getQueue() const; - boost::intrusive_ptr getMessage() const; + boost::shared_ptr getQueue() const; + boost::intrusive_ptr getMessage() const; qpid::broker::TxnHandle getTxnHandle() const; qpid::broker::AsyncResultQueue* getAsyncResultQueue() const; qpid::broker::AsyncResultCallback getAsyncResultCallback() const; @@ -65,8 +65,8 @@ public: void destroy(); private: - boost::shared_ptr m_q; - boost::intrusive_ptr m_msg; + boost::shared_ptr m_q; + boost::intrusive_ptr m_msg; qpid::broker::TxnHandle m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; qpid::broker::AsyncResultCallback m_rcb; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 8ee858587b..11af7c9466 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -23,8 +23,8 @@ #include "QueuedMessage.h" -#include "SimplePersistableMessage.h" -#include "SimplePersistableQueue.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" #include "qpid/asyncStore/AsyncStoreImpl.h" @@ -36,8 +36,8 @@ QueuedMessage::QueuedMessage() : m_queue(0) {} -QueuedMessage::QueuedMessage(SimplePersistableQueue* q, - boost::intrusive_ptr msg) : +QueuedMessage::QueuedMessage(SimpleQueue* q, + boost::intrusive_ptr msg) : m_queue(q), m_msg(msg), m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) @@ -61,7 +61,13 @@ QueuedMessage::operator=(const QueuedMessage& rhs) return *this; } -boost::intrusive_ptr +SimpleQueue* +QueuedMessage::getQueue() const +{ + return m_queue; +} + +boost::intrusive_ptr QueuedMessage::payload() const { return m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index 896c53ab5b..12c8e4da08 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -39,19 +39,20 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class QueuedMessage { public: QueuedMessage(); - QueuedMessage(SimplePersistableQueue* q, - boost::intrusive_ptr msg); + QueuedMessage(SimpleQueue* q, + boost::intrusive_ptr msg); QueuedMessage(const QueuedMessage& qm); ~QueuedMessage(); QueuedMessage& operator=(const QueuedMessage& rhs); - boost::intrusive_ptr payload() const; + SimpleQueue* getQueue() const; + boost::intrusive_ptr payload() const; const qpid::broker::EnqueueHandle& enqHandle() const; qpid::broker::EnqueueHandle& enqHandle(); @@ -61,8 +62,8 @@ public: void abortEnqueue(); private: - SimplePersistableQueue* m_queue; - boost::intrusive_ptr m_msg; + SimpleQueue* m_queue; + boost::intrusive_ptr m_msg; qpid::broker::EnqueueHandle m_enqHandle; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp new file mode 100644 index 0000000000..29db6ceaf2 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.cpp + */ + +#include "SimpleMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) +{} + +SimpleMessage::~SimpleMessage() +{} + +const qpid::broker::MessageHandle& +SimpleMessage::getHandle() const +{ + return m_msgHandle; +} + +qpid::broker::MessageHandle& +SimpleMessage::getHandle() +{ + return m_msgHandle; +} + +uint64_t +SimpleMessage::contentSize() const +{ + return static_cast(m_msg.size()); +} + +void +SimpleMessage::setPersistenceId(uint64_t id) const +{ + m_persistenceId = id; +} + +uint64_t +SimpleMessage::getPersistenceId() const +{ + return m_persistenceId; +} + +void +SimpleMessage::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putRawData(m_msg); +} + +uint32_t +SimpleMessage::encodedSize() const +{ + return static_cast(m_msg.size()); +} + +void +SimpleMessage::allDequeuesComplete() +{} + +uint32_t +SimpleMessage::encodedHeaderSize() const +{ + return 0; +} + +bool +SimpleMessage::isPersistent() const +{ + return m_msgHandle.isValid(); +} + +uint64_t +SimpleMessage::getSize() +{ + return m_msg.size(); +} + +void +SimpleMessage::write(char* target) +{ + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h new file mode 100644 index 0000000000..1b3e034814 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_ +#define tests_storePerftools_asyncPerf_SimpleMessage_h_ + +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/MessageHandle.h" +#include "qpid/broker/PersistableMessage.h" + +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class SimpleQueue; + +class SimpleMessage: public qpid::broker::PersistableMessage, + public qpid::broker::DataSource +{ +public: + SimpleMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store); + virtual ~SimpleMessage(); + const qpid::broker::MessageHandle& getHandle() const; + qpid::broker::MessageHandle& getHandle(); + uint64_t contentSize() const; + + // --- Interface Persistable --- + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // --- Interface PersistableMessage --- + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // --- Interface DataSource --- + virtual uint64_t getSize(); // <- same as encodedSize()? + virtual void write(char* target); + +private: + mutable uint64_t m_persistenceId; + const std::string m_msg; + qpid::broker::MessageHandle m_msgHandle; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp deleted file mode 100644 index a9771c1442..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimplePersistableMessage.cpp - */ - -#include "SimplePersistableMessage.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -SimplePersistableMessage::SimplePersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast(msgSize)), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) -{} - -SimplePersistableMessage::~SimplePersistableMessage() -{} - -const qpid::broker::MessageHandle& -SimplePersistableMessage::getHandle() const -{ - return m_msgHandle; -} - -qpid::broker::MessageHandle& -SimplePersistableMessage::getHandle() -{ - return m_msgHandle; -} - -uint64_t -SimplePersistableMessage::contentSize() const -{ - return static_cast(m_msg.size()); -} - -void -SimplePersistableMessage::setPersistenceId(uint64_t id) const -{ - m_persistenceId = id; -} - -uint64_t -SimplePersistableMessage::getPersistenceId() const -{ - return m_persistenceId; -} - -void -SimplePersistableMessage::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putRawData(m_msg); -} - -uint32_t -SimplePersistableMessage::encodedSize() const -{ - return static_cast(m_msg.size()); -} - -void -SimplePersistableMessage::allDequeuesComplete() -{} - -uint32_t -SimplePersistableMessage::encodedHeaderSize() const -{ - return 0; -} - -bool -SimplePersistableMessage::isPersistent() const -{ - return m_msgHandle.isValid(); -} - -uint64_t -SimplePersistableMessage::getSize() -{ - return m_msg.size(); -} - -void -SimplePersistableMessage::write(char* target) -{ - ::memcpy(target, m_msg.data(), m_msg.size()); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h deleted file mode 100644 index 7ac54ddea1..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimplePersistableMessage.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ -#define tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ - -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/MessageHandle.h" -#include "qpid/broker/PersistableMessage.h" - -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimplePersistableQueue; - -class SimplePersistableMessage: public qpid::broker::PersistableMessage, - public qpid::broker::DataSource -{ -public: - SimplePersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store); - virtual ~SimplePersistableMessage(); - const qpid::broker::MessageHandle& getHandle() const; - qpid::broker::MessageHandle& getHandle(); - uint64_t contentSize() const; - - // --- Interface Persistable --- - virtual void setPersistenceId(uint64_t id) const; - virtual uint64_t getPersistenceId() const; - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - - // --- Interface PersistableMessage --- - virtual void allDequeuesComplete(); - virtual uint32_t encodedHeaderSize() const; - virtual bool isPersistent() const; - - // --- Interface DataSource --- - virtual uint64_t getSize(); // <- same as encodedSize()? - virtual void write(char* target); - -private: - mutable uint64_t m_persistenceId; - const std::string m_msg; - qpid::broker::MessageHandle m_msgHandle; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp deleted file mode 100644 index be2b4c891b..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimplePersistableQueue.cpp - */ - -#include "SimplePersistableQueue.h" - -#include "MessageDeque.h" -#include "SimplePersistableMessage.h" -#include "QueueAsyncContext.h" -#include "QueuedMessage.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/TxnHandle.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -//static -qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations - - -SimplePersistableQueue::SimplePersistableQueue(const std::string& name, - const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& arq) : - qpid::broker::PersistableQueue(), - m_name(name), - m_store(store), - m_resultQueue(arq), - m_asyncOpCounter(0UL), - m_persistenceId(0ULL), - m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_destroyPending(false), - m_destroyed(false), - m_barrier(*this), - m_messages(new MessageDeque()) -{ - if (m_store != 0) { - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); - } -} - -SimplePersistableQueue::~SimplePersistableQueue() -{ -// m_store->flush(*this); - // TODO: Make destroying the store a test parameter -// m_store->destroy(*this); -// m_store = 0; -} - -// static -void -SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { -//std::cout << "QQQ SimplePersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; - // Handle async success here - switch(qc->getOpCode()) { - case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: - qc->getQueue()->createComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: - qc->getQueue()->flushComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: - qc->getQueue()->destroyComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - qc->getQueue()->enqueueComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - qc->getQueue()->dequeueComplete(qc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimplePersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } -} - -const qpid::broker::QueueHandle& -SimplePersistableQueue::getHandle() const -{ - return m_queueHandle; -} - -qpid::broker::QueueHandle& -SimplePersistableQueue::getHandle() -{ - return m_queueHandle; -} - -qpid::asyncStore::AsyncStoreImpl* -SimplePersistableQueue::getStore() -{ - return m_store; -} - -void -SimplePersistableQueue::asyncCreate() -{ - if (m_store) { - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_CREATE, - &handleAsyncResult, - &m_resultQueue)); - m_store->submitCreate(m_queueHandle, - this, - qac); - ++m_asyncOpCounter; - } -} - -void -SimplePersistableQueue::asyncDestroy(const bool deleteQueue) -{ - m_destroyPending = true; - if (m_store) { - if (deleteQueue) { - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, - &handleAsyncResult, - &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, - qac); - ++m_asyncOpCounter; - } - m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); - } -} - -void -SimplePersistableQueue::deliver(boost::intrusive_ptr msg) -{ - QueuedMessage qm(this, msg); - enqueue(s_nullTxnHandle, qm); - push(qm); -} - -bool -SimplePersistableQueue::dispatch() -{ - QueuedMessage qm; - if (m_messages->consume(qm)) { - return dequeue(s_nullTxnHandle, qm); - } - return false; -} - -bool -SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) -{ - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); - } - return false; -} - -bool -SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) -{ - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); - } - return true; -} - -void -SimplePersistableQueue::process(boost::intrusive_ptr msg) -{ - QueuedMessage qm(this, msg); - push(qm); -} - -void -SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr /*msg*/) -{} - -void -SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putShortString(m_name); -} - -uint32_t -SimplePersistableQueue::encodedSize() const -{ - return m_name.size() + 1; -} - -uint64_t -SimplePersistableQueue::getPersistenceId() const -{ - return m_persistenceId; -} - -void -SimplePersistableQueue::setPersistenceId(uint64_t persistenceId) const -{ - m_persistenceId = persistenceId; -} - -void -SimplePersistableQueue::flush() -{ - //if(m_store) m_store->flush(*this); -} - -const std::string& -SimplePersistableQueue::getName() const -{ - return m_name; -} - -void -SimplePersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) -{ - if (externalQueueStore != inst && externalQueueStore) - delete externalQueueStore; - externalQueueStore = inst; -} - -uint64_t -SimplePersistableQueue::getSize() -{ - return m_persistableData.size(); -} - -void -SimplePersistableQueue::write(char* target) -{ - ::memcpy(target, m_persistableData.data(), m_persistableData.size()); -} - -// --- Members & methods in msg handling path from qpid::Queue --- - -// protected -SimplePersistableQueue::UsageBarrier::UsageBarrier(SimplePersistableQueue& q) : - m_parent(q), - m_count(0) -{} - -// protected -bool -SimplePersistableQueue::UsageBarrier::acquire() -{ - qpid::sys::Monitor::ScopedLock l(m_monitor); - if (m_parent.m_destroyed) { - return false; - } else { - ++m_count; - return true; - } -} - -// protected -void SimplePersistableQueue::UsageBarrier::release() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - if (--m_count == 0) { - m_monitor.notifyAll(); - } -} - -// protected -void SimplePersistableQueue::UsageBarrier::destroy() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - m_parent.m_destroyed = true; - while (m_count) { - m_monitor.wait(); - } -} - -// protected -SimplePersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) : - m_barrier(b), - m_acquired(m_barrier.acquire()) -{} - -// protected -SimplePersistableQueue::ScopedUse::~ScopedUse() -{ - if (m_acquired) { - m_barrier.release(); - } -} - -// private -void -SimplePersistableQueue::push(QueuedMessage& qm, - bool /*isRecovery*/) -{ - QueuedMessage removed; - m_messages->push(qm, removed); -} - -// --- End Members & methods in msg handling path from qpid::Queue --- - -// private -bool -SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) -{ - qm.payload()->setPersistenceId(m_store->getNextRid()); -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), - th, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - &handleAsyncResult, - &m_resultQueue)); - if (th.isValid()) { - th.incrOpCnt(); - } - m_store->submitEnqueue(qm.enqHandle(), - th, - qac); - ++m_asyncOpCounter; - return true; -} - -// private -bool -SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) -{ -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; - boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), - th, - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - &handleAsyncResult, - &m_resultQueue)); - if (th.isValid()) { - th.incrOpCnt(); - } - m_store->submitDequeue(qm.enqHandle(), - th, - qac); - ++m_asyncOpCounter; - return true; -} - -// private -void -SimplePersistableQueue::destroyCheck(const std::string& opDescr) const -{ - if (m_destroyPending || m_destroyed) { - std::ostringstream oss; - oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; - throw qpid::Exception(oss.str()); - } -} - -// private -void -SimplePersistableQueue::createComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; -} - -// private -void -SimplePersistableQueue::flushComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; -} - -// private -void -SimplePersistableQueue::destroyComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - m_destroyed = true; -} - -// private -void -SimplePersistableQueue::enqueueComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); - } -} - -// private -void -SimplePersistableQueue::dequeueComplete(const boost::shared_ptr qc) -{ -//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); - } -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h deleted file mode 100644 index 34ef9407ac..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimplePersistableQueue.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ -#define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ - -#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/QueueHandle.h" -#include "qpid/sys/Monitor.h" - -#include -#include -#include - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} -namespace broker { -class AsyncResultQueue; -class TxnHandle; -} -namespace framing { -class FieldTable; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class Messages; -class SimplePersistableMessage; -class QueueAsyncContext; -class QueuedMessage; - -class SimplePersistableQueue : public boost::enable_shared_from_this, - public qpid::broker::PersistableQueue, - public qpid::broker::DataSource -{ -public: - SimplePersistableQueue(const std::string& name, - const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& arq); - virtual ~SimplePersistableQueue(); - - static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); - const qpid::broker::QueueHandle& getHandle() const; - qpid::broker::QueueHandle& getHandle(); - qpid::asyncStore::AsyncStoreImpl* getStore(); - - void asyncCreate(); - void asyncDestroy(const bool deleteQueue); - - // --- Methods in msg handling path from qpid::Queue --- - void deliver(boost::intrusive_ptr msg); - bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param - bool enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); - bool dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); - void process(boost::intrusive_ptr msg); - void enqueueAborted(boost::intrusive_ptr msg); - - // --- Interface qpid::broker::Persistable --- - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - virtual uint64_t getPersistenceId() const; - virtual void setPersistenceId(uint64_t persistenceId) const; - - // --- Interface qpid::broker::PersistableQueue --- - virtual void flush(); - virtual const std::string& getName() const; - virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - - // --- Interface DataStore --- - virtual uint64_t getSize(); - virtual void write(char* target); - -private: - static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations - - const std::string m_name; - qpid::asyncStore::AsyncStoreImpl* m_store; - qpid::broker::AsyncResultQueue& m_resultQueue; - qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; - mutable uint64_t m_persistenceId; - std::string m_persistableData; - qpid::broker::QueueHandle m_queueHandle; - bool m_destroyPending; - bool m_destroyed; - - // --- Members & methods in msg handling path copied from qpid::Queue --- - struct UsageBarrier - { - SimplePersistableQueue& m_parent; - uint32_t m_count; - qpid::sys::Monitor m_monitor; - UsageBarrier(SimplePersistableQueue& q); - bool acquire(); - void release(); - void destroy(); - }; - struct ScopedUse - { - UsageBarrier& m_barrier; - const bool m_acquired; - ScopedUse(UsageBarrier& b); - ~ScopedUse(); - }; - UsageBarrier m_barrier; - std::auto_ptr m_messages; - void push(QueuedMessage& qm, - bool isRecovery = false); - - // -- Async ops --- - bool asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); - bool asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); - - // --- Async op counter --- - void destroyCheck(const std::string& opDescr) const; - - // --- Async op completions (called through handleAsyncResult) --- - void createComplete(const boost::shared_ptr qc); - void flushComplete(const boost::shared_ptr qc); - void destroyComplete(const boost::shared_ptr qc); - void enqueueComplete(const boost::shared_ptr qc); - void dequeueComplete(const boost::shared_ptr qc); -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp new file mode 100644 index 0000000000..d8b312f011 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.cpp + */ + +#include "SimpleQueue.h" + +#include "MessageDeque.h" +#include "SimpleMessage.h" +#include "QueueAsyncContext.h" +#include "QueuedMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/TxnHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +//static +qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations + + +SimpleQueue::SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq) : + qpid::broker::PersistableQueue(), + m_name(name), + m_store(store), + m_resultQueue(arq), + m_asyncOpCounter(0UL), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new MessageDeque()) +{ + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } +} + +SimpleQueue::~SimpleQueue() +{ +// m_store->flush(*this); + // TODO: Make destroying the store a test parameter +// m_store->destroy(*this); +// m_store = 0; +} + +// static +void +SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) +{ + if (arh) { + boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { +//std::cout << "QQQ SimpleQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; + // Handle async success here + switch(qc->getOpCode()) { + case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: + qc->getQueue()->createComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: + qc->getQueue()->flushComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: + qc->getQueue()->destroyComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + qc->getQueue()->enqueueComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + qc->getQueue()->dequeueComplete(qc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); + throw qpid::Exception(oss.str()); + }; + } + } +} + +const qpid::broker::QueueHandle& +SimpleQueue::getHandle() const +{ + return m_queueHandle; +} + +qpid::broker::QueueHandle& +SimpleQueue::getHandle() +{ + return m_queueHandle; +} + +qpid::asyncStore::AsyncStoreImpl* +SimpleQueue::getStore() +{ + return m_store; +} + +void +SimpleQueue::asyncCreate() +{ + if (m_store) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, + qpid::asyncStore::AsyncOperation::QUEUE_CREATE, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitCreate(m_queueHandle, + this, + qac); + ++m_asyncOpCounter; + } +} + +void +SimpleQueue::asyncDestroy(const bool deleteQueue) +{ + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, + qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitDestroy(m_queueHandle, + qac); + ++m_asyncOpCounter; + } + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); + } +} + +void +SimpleQueue::deliver(boost::intrusive_ptr msg) +{ + QueuedMessage qm(this, msg); + enqueue(s_nullTxnHandle, qm); + push(qm); +} + +bool +SimpleQueue::dispatch() +{ + QueuedMessage qm; + if (m_messages->consume(qm)) { + return dequeue(s_nullTxnHandle, qm); + } + return false; +} + +bool +SimpleQueue::enqueue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) +{ + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(th, qm); + } + return false; +} + +bool +SimpleQueue::dequeue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) +{ + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(th, qm); + } + return true; +} + +void +SimpleQueue::process(boost::intrusive_ptr msg) +{ + QueuedMessage qm(this, msg); + push(qm); +} + +void +SimpleQueue::enqueueAborted(boost::intrusive_ptr /*msg*/) +{} + +void +SimpleQueue::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putShortString(m_name); +} + +uint32_t +SimpleQueue::encodedSize() const +{ + return m_name.size() + 1; +} + +uint64_t +SimpleQueue::getPersistenceId() const +{ + return m_persistenceId; +} + +void +SimpleQueue::setPersistenceId(uint64_t persistenceId) const +{ + m_persistenceId = persistenceId; +} + +void +SimpleQueue::flush() +{ + //if(m_store) m_store->flush(*this); +} + +const std::string& +SimpleQueue::getName() const +{ + return m_name; +} + +void +SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) +{ + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +SimpleQueue::getSize() +{ + return m_persistableData.size(); +} + +void +SimpleQueue::write(char* target) +{ + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +SimpleQueue::UsageBarrier::acquire() +{ + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void SimpleQueue::UsageBarrier::release() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void SimpleQueue::UsageBarrier::destroy() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +SimpleQueue::ScopedUse::~ScopedUse() +{ + if (m_acquired) { + m_barrier.release(); + } +} + +// private +void +SimpleQueue::push(QueuedMessage& qm, + bool /*isRecovery*/) +{ + QueuedMessage removed; + m_messages->push(qm, removed); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// private +bool +SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) +{ + qm.payload()->setPersistenceId(m_store->getNextRid()); +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm.payload(), + th, + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, + &handleAsyncResult, + &m_resultQueue)); + if (th.isValid()) { + th.incrOpCnt(); + } + m_store->submitEnqueue(qm.enqHandle(), + th, + qac); + ++m_asyncOpCounter; + return true; +} + +// private +bool +SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) +{ +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm.payload(), + th, + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, + &handleAsyncResult, + &m_resultQueue)); + if (th.isValid()) { + th.incrOpCnt(); + } + m_store->submitDequeue(qm.enqHandle(), + th, + qac); + ++m_asyncOpCounter; + return true; +} + +// private +void +SimpleQueue::destroyCheck(const std::string& opDescr) const +{ + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + +// private +void +SimpleQueue::createComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::flushComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::destroyComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; + m_destroyed = true; +} + +// private +void +SimpleQueue::enqueueComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } +} + +// private +void +SimpleQueue::dequeueComplete(const boost::shared_ptr qc) +{ +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; + + qpid::broker::TxnHandle th = qc->getTxnHandle(); + if (th.isValid()) { // transactional enqueue + th.decrOpCnt(); + } +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h new file mode 100644 index 0000000000..bc9dda0d98 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_ +#define tests_storePerftools_asyncPerf_SimpleQueue_h_ + +#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Monitor.h" + +#include +#include +#include + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace broker { +class AsyncResultQueue; +class TxnHandle; +} +namespace framing { +class FieldTable; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class Messages; +class SimpleMessage; +class QueueAsyncContext; +class QueuedMessage; + +class SimpleQueue : public boost::enable_shared_from_this, + public qpid::broker::PersistableQueue, + public qpid::broker::DataSource +{ +public: + SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& args, + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq); + virtual ~SimpleQueue(); + + static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); + const qpid::broker::QueueHandle& getHandle() const; + qpid::broker::QueueHandle& getHandle(); + qpid::asyncStore::AsyncStoreImpl* getStore(); + + void asyncCreate(); + void asyncDestroy(const bool deleteQueue); + + // --- Methods in msg handling path from qpid::Queue --- + void deliver(boost::intrusive_ptr msg); + bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param + bool enqueue(qpid::broker::TxnHandle& th, + QueuedMessage& qm); + bool dequeue(qpid::broker::TxnHandle& th, + QueuedMessage& qm); + void process(boost::intrusive_ptr msg); + void enqueueAborted(boost::intrusive_ptr msg); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); + + // --- Interface DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +private: + static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations + + const std::string m_name; + qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; + mutable uint64_t m_persistenceId; + std::string m_persistableData; + qpid::broker::QueueHandle m_queueHandle; + bool m_destroyPending; + bool m_destroyed; + + // --- Members & methods in msg handling path copied from qpid::Queue --- + struct UsageBarrier + { + SimpleQueue& m_parent; + uint32_t m_count; + qpid::sys::Monitor m_monitor; + UsageBarrier(SimpleQueue& q); + bool acquire(); + void release(); + void destroy(); + }; + struct ScopedUse + { + UsageBarrier& m_barrier; + const bool m_acquired; + ScopedUse(UsageBarrier& b); + ~ScopedUse(); + }; + UsageBarrier m_barrier; + std::auto_ptr m_messages; + void push(QueuedMessage& qm, + bool isRecovery = false); + + // -- Async ops --- + bool asyncEnqueue(qpid::broker::TxnHandle& th, + QueuedMessage& qm); + bool asyncDequeue(qpid::broker::TxnHandle& th, + QueuedMessage& qm); + + // --- Async op counter --- + void destroyCheck(const std::string& opDescr) const; + + // --- Async op completions (called through handleAsyncResult) --- + void createComplete(const boost::shared_ptr qc); + void flushComplete(const boost::shared_ptr qc); + void destroyComplete(const boost::shared_ptr qc); + void enqueueComplete(const boost::shared_ptr qc); + void dequeueComplete(const boost::shared_ptr qc); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp new file mode 100644 index 0000000000..c1d35805a6 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnAccept.cpp + */ + +#include "TxnAccept.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TxnAccept::TxnAccept() +{} + +TxnAccept::~TxnAccept() +{} + +// --- Interface TxnOp --- + +bool +TxnAccept::prepare(qpid::broker::TxnHandle& /*th*/) throw() +{ + return false; +} + +void +TxnAccept::commit() throw() +{} + +void +TxnAccept::rollback() throw() +{} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h new file mode 100644 index 0000000000..f164a4c965 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnAccept.h + */ + +#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_ +#define tests_storePerftools_asyncPerf_TxnAccept_h_ + +#include "qpid/broker/TxnOp.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TxnAccept: public qpid::broker::TxnOp { +public: + TxnAccept(); + virtual ~TxnAccept(); + + // --- Interface TxnOp --- + bool prepare(qpid::broker::TxnHandle& th) throw(); + void commit() throw(); + void rollback() throw(); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_TxnAccept_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index eff7646de6..2927dc60e2 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -21,8 +21,8 @@ * \file TxnPublish.cpp */ -#include "SimplePersistableMessage.h" -#include "SimplePersistableQueue.h" // debug msg +#include "SimpleMessage.h" +#include "SimpleQueue.h" // debug msg #include "TxnPublish.h" #include "QueuedMessage.h" @@ -31,7 +31,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -TxnPublish::TxnPublish(boost::intrusive_ptr msg) : +TxnPublish::TxnPublish(boost::intrusive_ptr msg) : m_msg(msg) { //std::cout << "TTT new TxnPublish" << std::endl << std::flush; @@ -96,7 +96,7 @@ TxnPublish::contentSize() } void -TxnPublish::deliverTo(const boost::shared_ptr& queue) +TxnPublish::deliverTo(const boost::shared_ptr& queue) { //std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush; boost::shared_ptr qm(new QueuedMessage(queue.get(), m_msg)); @@ -104,7 +104,7 @@ TxnPublish::deliverTo(const boost::shared_ptr& queue) m_delivered = true; } -SimplePersistableMessage& +SimpleMessage& TxnPublish::getMessage() { return *m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h index 6e97e99349..a7255314bd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h @@ -34,9 +34,7 @@ namespace qpid { namespace broker { - class TransactionContext; - }} namespace tests { @@ -44,14 +42,14 @@ namespace storePerftools { namespace asyncPerf { class QueuedMessage; -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class TxnPublish : public qpid::broker::TxnOp, public Deliverable { public: - TxnPublish(boost::intrusive_ptr msg); + TxnPublish(boost::intrusive_ptr msg); virtual ~TxnPublish(); // --- Interface TxOp --- @@ -61,11 +59,11 @@ public: // --- Interface Deliverable --- uint64_t contentSize(); - void deliverTo(const boost::shared_ptr& queue); - SimplePersistableMessage& getMessage(); + void deliverTo(const boost::shared_ptr& queue); + SimpleMessage& getMessage(); private: - boost::intrusive_ptr m_msg; + boost::intrusive_ptr m_msg; std::list > m_queues; std::list > m_prepared; }; -- cgit v1.2.1