diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-07-16 13:54:11 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-07-16 13:54:11 +0000 |
commit | a804510d81ade0594a75b5c9b8765cafcc233245 (patch) | |
tree | 8c6be643564b6d8c88619d17de7150c98a314781 | |
parent | 1ab07197127e990da2c765ea0ffa5fd8ca47b7b6 (diff) | |
download | qpid-python-a804510d81ade0594a75b5c9b8765cafcc233245.tar.gz |
QPID-3858: Refactor to tidy up several class design issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1362039 13f79535-47bb-0310-9956-ffa450edef68
65 files changed, 952 insertions, 634 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 6077bade08..1f3c2a739d 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -666,6 +666,9 @@ include (rdma.cmake) # Check for optional SSL support requirements include (ssl.cmake) +# Check for optional async store build requirements +#include (asyncstore.cmake) + # Check for syslog capabilities not present on all systems check_symbol_exists (LOG_AUTHPRIV "sys/syslog.h" HAVE_LOG_AUTHPRIV) check_symbol_exists (LOG_FTP "sys/syslog.h" HAVE_LOG_FTP) @@ -1147,6 +1150,13 @@ set (qpidbroker_SOURCES qpid/management/ManagementDirectExchange.cpp qpid/management/ManagementTopicExchange.cpp qpid/sys/TCPIOPlugin.cpp +# New async store objects and new versions of broker objects +# qpid/broker/AsyncResultHandle.cpp +# qpid/broker/AsyncResultHandleImpl.cpp +# qpid/broker/IdHandle.cpp +# qpid/broker/TxnAsyncContext.cpp +# qpid/broker/TxnBuffer.cpp +# qpid/broker/TxnHandle.cpp ) add_msvc_version (qpidbroker library dll) add_library (qpidbroker SHARED ${qpidbroker_SOURCES}) @@ -1459,71 +1469,12 @@ if (UNIX) COMPONENT ${QPID_COMPONENT_COMMON}) endif (UNIX) - # Async Store support - -# Journal 2 source files -set (jrnl2_SOURCES - qpid/asyncStore/jrnl2/AsyncJournal.cpp - qpid/asyncStore/jrnl2/DataOpState.cpp - qpid/asyncStore/jrnl2/DataToken.cpp - qpid/asyncStore/jrnl2/DataWrComplState.cpp - qpid/asyncStore/jrnl2/DequeueHeader.cpp - qpid/asyncStore/jrnl2/EnqueueHeader.cpp - qpid/asyncStore/jrnl2/EventHeader.cpp - qpid/asyncStore/jrnl2/FileHeader.cpp - qpid/asyncStore/jrnl2/JournalDirectory.cpp - qpid/asyncStore/jrnl2/JournalError.cpp - qpid/asyncStore/jrnl2/JournalParameters.cpp - qpid/asyncStore/jrnl2/JournalRunState.cpp - qpid/asyncStore/jrnl2/RecordHeader.cpp - qpid/asyncStore/jrnl2/RecordTail.cpp - qpid/asyncStore/jrnl2/ScopedLock.cpp - qpid/asyncStore/jrnl2/Streamable.cpp - qpid/asyncStore/jrnl2/TransactionHeader.cpp -) - -# AsyncStore source files -set (asyncStore_SOURCES - qpid/asyncStore/AsyncOperation.cpp - qpid/asyncStore/AsyncStoreImpl.cpp - qpid/asyncStore/AsyncStoreOptions.cpp - qpid/asyncStore/ConfigHandleImpl.cpp - qpid/asyncStore/EnqueueHandleImpl.cpp - qpid/asyncStore/EventHandleImpl.cpp - qpid/asyncStore/MessageHandleImpl.cpp - qpid/asyncStore/OperationQueue.cpp - qpid/asyncStore/Plugin.cpp - qpid/asyncStore/QueueHandleImpl.cpp - qpid/asyncStore/RunState.cpp - qpid/asyncStore/TxnHandleImpl.cpp - qpid/broker/AsyncResultHandle.cpp - qpid/broker/AsyncResultHandleImpl.cpp - qpid/broker/AsyncResultQueueImpl.cpp - qpid/broker/ConfigHandle.cpp - qpid/broker/EnqueueHandle.cpp - qpid/broker/EventHandle.cpp - qpid/broker/IdHandle.cpp - qpid/broker/MessageHandle.cpp - qpid/broker/QueueHandle.cpp - qpid/broker/TxnAsyncContext.cpp - qpid/broker/TxnBuffer.cpp - qpid/broker/TxnHandle.cpp -) - -if (UNIX) - add_library (asyncStore MODULE - ${jrnl2_SOURCES} - ${asyncStore_SOURCES} - ) - set_target_properties (asyncStore PROPERTIES - PREFIX "" - OUTPUT_NAME asyncStore - SOVERSION ${asyncStore_version} - ) - target_link_libraries (asyncStore - aio - rt - uuid - ) -endif (UNIX) +# ------------------- +# NOTE: Currently this must be included AFTER the "add_subdirectory(tests)" +# line (a few lines above) to work - otherwise cmake complains of linking +# to lib of type MODULE_LIBRARY (the fact that position of include makes a +# difference might be a possible CMake bug?). The link issue is being +# corrected separately, and when it is, this can be moved up to the location +# of the other module includes. +include (asyncstore.cmake) diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake new file mode 100644 index 0000000000..af9a358d1e --- /dev/null +++ b/cpp/src/asyncstore.cmake @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# +# Async store library CMake fragment, to be included in CMakeLists.txt +# + +# Journal 2 source files +set (jrnl2_SOURCES + qpid/asyncStore/jrnl2/AsyncJournal.cpp + qpid/asyncStore/jrnl2/DataOpState.cpp + qpid/asyncStore/jrnl2/DataToken.cpp + qpid/asyncStore/jrnl2/DataWrComplState.cpp + qpid/asyncStore/jrnl2/DequeueHeader.cpp + qpid/asyncStore/jrnl2/EnqueueHeader.cpp + qpid/asyncStore/jrnl2/EventHeader.cpp + qpid/asyncStore/jrnl2/FileHeader.cpp + qpid/asyncStore/jrnl2/JournalDirectory.cpp + qpid/asyncStore/jrnl2/JournalError.cpp + qpid/asyncStore/jrnl2/JournalParameters.cpp + qpid/asyncStore/jrnl2/JournalRunState.cpp + qpid/asyncStore/jrnl2/RecordHeader.cpp + qpid/asyncStore/jrnl2/RecordTail.cpp + qpid/asyncStore/jrnl2/ScopedLock.cpp + qpid/asyncStore/jrnl2/Streamable.cpp + qpid/asyncStore/jrnl2/TransactionHeader.cpp +) + +# AsyncStore source files +set (asyncStore_SOURCES + qpid/asyncStore/AsyncOperation.cpp + qpid/asyncStore/AsyncStoreImpl.cpp + qpid/asyncStore/AsyncStoreOptions.cpp + qpid/asyncStore/ConfigHandleImpl.cpp + qpid/asyncStore/EnqueueHandleImpl.cpp + qpid/asyncStore/EventHandleImpl.cpp + qpid/asyncStore/MessageHandleImpl.cpp + qpid/asyncStore/OperationQueue.cpp + qpid/asyncStore/Plugin.cpp + qpid/asyncStore/QueueHandleImpl.cpp + qpid/asyncStore/RunState.cpp + qpid/asyncStore/TxnHandleImpl.cpp + qpid/broker/AsyncResultHandle.cpp + qpid/broker/AsyncResultHandleImpl.cpp + qpid/broker/AsyncResultQueueImpl.cpp + qpid/broker/ConfigHandle.cpp + qpid/broker/EnqueueHandle.cpp + qpid/broker/EventHandle.cpp + qpid/broker/MessageHandle.cpp + qpid/broker/QueueHandle.cpp + qpid/broker/TxnAsyncContext.cpp + qpid/broker/TxnBuffer.cpp + qpid/broker/TxnHandle.cpp +) + +if (UNIX) + add_library (asyncStore MODULE + ${jrnl2_SOURCES} + ${asyncStore_SOURCES} + ) + set_target_properties (asyncStore PROPERTIES + PREFIX "" + OUTPUT_NAME asyncStore + SOVERSION ${asyncStore_version} + ) + target_link_libraries (asyncStore + aio + rt + uuid + ) +endif (UNIX) diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index bcaac5c548..b8fdb8b140 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -38,7 +38,7 @@ AsyncOperation::AsyncOperation() : {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), m_targetHandle(th), @@ -48,7 +48,7 @@ AsyncOperation::AsyncOperation(const opCode op, {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), @@ -59,7 +59,7 @@ AsyncOperation::AsyncOperation(const opCode op, {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), @@ -70,7 +70,7 @@ AsyncOperation::AsyncOperation(const opCode op, {} AsyncOperation::AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index 188750e910..cb73ad639b 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -25,10 +25,10 @@ #define qpid_asyncStore_AsyncOperation_h_ #include "qpid/broker/AsyncStore.h" -#include "qpid/broker/IdHandle.h" namespace qpid { namespace asyncStore { +class AsyncStoreHandle; class AsyncOperation { public: @@ -49,18 +49,18 @@ public: AsyncOperation(); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, - const qpid::broker::IdHandle* th, + const AsyncStoreHandle* th, const qpid::broker::DataSource* const dataSrc, const qpid::broker::TxnHandle* txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); @@ -71,7 +71,7 @@ public: private: opCode m_op; - const qpid::broker::IdHandle* m_targetHandle; + const AsyncStoreHandle* m_targetHandle; const qpid::broker::DataSource* const m_dataSrc; const qpid::broker::TxnHandle* m_txnHandle; boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; diff --git a/cpp/src/qpid/broker/IdHandle.h b/cpp/src/qpid/asyncStore/AsyncStoreHandle.h index ddf94a2396..97111d1b84 100644 --- a/cpp/src/qpid/broker/IdHandle.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreHandle.h @@ -18,20 +18,20 @@ */ /** - * \file IdHandle.h + * \file AsyncStoreHandle.h */ -#ifndef qpid_broker_IdHandle_h_ -#define qpid_broker_IdHandle_h_ +#ifndef qpid_asyncStore_AsyncStoreHandle_h_ +#define qpid_asyncStore_AsyncStoreHandle_h_ namespace qpid { -namespace broker { +namespace asyncStore { -class IdHandle { +class AsyncStoreHandle { public: - virtual ~IdHandle(); + virtual ~AsyncStoreHandle() {} }; -}} // namespace qpid::broker +}} // namespace qpid::asyncStore -#endif // qpid_broker_IdHandle_h_ +#endif // qpid_asyncStore_AsyncStoreHandle_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 9135fcc27e..6b5c3ac582 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -23,7 +23,11 @@ #include "AsyncStoreImpl.h" -#include "AsyncOperation.h" +#include "ConfigHandleImpl.h" +#include "EnqueueHandleImpl.h" +#include "EventHandleImpl.h" +#include "MessageHandleImpl.h" +#include "QueueHandleImpl.h" #include "TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" @@ -33,8 +37,6 @@ #include "qpid/broker/QueueHandle.h" #include "qpid/broker/TxnHandle.h" -#include <boost/intrusive_ptr.hpp> - namespace qpid { namespace asyncStore { @@ -88,6 +90,36 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); } +void +AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { @@ -98,14 +130,16 @@ qpid::broker::EnqueueHandle AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, qpid::broker::QueueHandle& queueHandle) { - return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); + return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, + queueHandle)); } qpid::broker::EventHandle AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle, const std::string& key) { - return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, key)); + return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, + key)); } qpid::broker::MessageHandle @@ -123,42 +157,12 @@ AsyncStoreImpl::createQueueHandle(const std::string& name, } void -AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dynamic_cast<AsyncStoreHandle*>(&cfgHandle), dataSrc, brokerCtxt)); m_operations.submit(op); @@ -169,7 +173,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dynamic_cast<AsyncStoreHandle*>(&cfgHandle), brokerCtxt)); m_operations.submit(op); } @@ -180,7 +184,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dynamic_cast<AsyncStoreHandle*>(&queueHandle), dataSrc, brokerCtxt)); m_operations.submit(op); @@ -191,7 +195,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dynamic_cast<AsyncStoreHandle*>(&queueHandle), brokerCtxt)); m_operations.submit(op); } @@ -201,19 +205,7 @@ AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, + dynamic_cast<AsyncStoreHandle*>(&queueHandle), brokerCtxt)); m_operations.submit(op); } @@ -225,7 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dynamic_cast<AsyncStoreHandle*>(&eventHandle), dataSrc, &txnHandle, brokerCtxt)); @@ -234,21 +226,11 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dynamic_cast<AsyncStoreHandle*>(&eventHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); @@ -260,7 +242,7 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + dynamic_cast<AsyncStoreHandle*>(&enqHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); @@ -272,7 +254,7 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + dynamic_cast<AsyncStoreHandle*>(&enqHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index a00771abf5..7dee03dc6d 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -30,17 +30,19 @@ #include "qpid/asyncStore/jrnl2/RecordIdCounter.h" #include "qpid/broker/AsyncStore.h" -#include "qpid/sys/Poller.h" namespace qpid { - namespace broker { class Broker; -} // namespace qpid::broker +} + +namespace sys { +class Poller; +} namespace asyncStore { -class AsyncStoreImpl: public qpid::broker::AsyncStore { +class AsyncStoreImpl : public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, const AsyncStoreOptions& opts); @@ -52,12 +54,23 @@ public: void initManagement(qpid::broker::Broker* broker); - // --- Factory methods for creating handles --- + // --- Interface from AsyncTransactionalStore --- qpid::broker::TxnHandle createTxnHandle(); qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); qpid::broker::TxnHandle createTxnHandle(const std::string& xid); - qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid, + qpid::broker::TxnBuffer* tb); + + void submitPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + void submitCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + void submitAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + + + // --- Interface from AsyncStore --- qpid::broker::ConfigHandle createConfigHandle(); qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, @@ -68,16 +81,6 @@ public: qpid::broker::QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts); - - // --- Store async interface --- - - void submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); @@ -94,14 +97,9 @@ public: void submitCreate(qpid::broker::EventHandle& eventHandle, const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDestroy(qpid::broker::EventHandle& eventHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - void submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h index 2849e3d2c6..a1325839d9 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h @@ -24,16 +24,11 @@ #ifndef qpid_asyncStore_AsyncStoreOptions_h_ #define qpid_asyncStore_AsyncStoreOptions_h_ -#include "qpid/asyncStore/jrnl2/Streamable.h" - #include "qpid/Options.h" #include <string> namespace qpid { -namespace broker { -class Options; -} namespace asyncStore { class AsyncStoreOptions : public qpid::Options @@ -49,7 +44,8 @@ public: std::string m_storeDir; private: - // Static initialization race condition avoidance with static instance of Plugin class (using construct-on-first-use idiom). + // Static initialization race condition avoidance with static instance of Plugin class + // (using construct-on-first-use idiom). static std::string& getDefaultStoreDir(); }; diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp index 64e2e848fa..fc47f330a4 100644 --- a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp @@ -23,8 +23,6 @@ #include "ConfigHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp index 0e291def78..be87831520 100644 --- a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp @@ -23,8 +23,6 @@ #include "EnqueueHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h index f976b6d246..2788366c5c 100644 --- a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h +++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h @@ -27,7 +27,6 @@ #include "qpid/RefCounted.h" namespace qpid { - namespace broker { class MessageHandle; class QueueHandle; diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp index 05b98d1e5d..4fd8805912 100644 --- a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp @@ -23,8 +23,6 @@ #include "EventHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.h b/cpp/src/qpid/asyncStore/EventHandleImpl.h index 1c6bc52f9f..9a0e694d12 100644 --- a/cpp/src/qpid/asyncStore/EventHandleImpl.h +++ b/cpp/src/qpid/asyncStore/EventHandleImpl.h @@ -27,7 +27,6 @@ #include "qpid/RefCounted.h" namespace qpid { - namespace broker { class QueueHandle; } diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp index cea039221a..a926aa161f 100644 --- a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp @@ -23,8 +23,6 @@ #include "MessageHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.h b/cpp/src/qpid/asyncStore/MessageHandleImpl.h index eb80fca2d4..ded9d63375 100644 --- a/cpp/src/qpid/asyncStore/MessageHandleImpl.h +++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.h @@ -27,7 +27,6 @@ #include "qpid/RefCounted.h" namespace qpid { - namespace broker { class DataSource; } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 0b7f58bd6c..dd4e7c1343 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -24,6 +24,8 @@ #include "OperationQueue.h" #include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncResultHandleImpl.h" +#include "qpid/log/Statement.h" namespace qpid { namespace asyncStore { @@ -42,7 +44,6 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op) { -//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -52,7 +53,6 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); if (bc) { qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); @@ -64,9 +64,9 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) } } } catch (const std::exception& e) { - std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl; + QPID_LOG(error, "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what()); } catch (...) { - std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl; + QPID_LOG(error, "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op"); } return e.end(); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index d2bd5d0f26..143ac2ab0c 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -26,7 +26,7 @@ #include "AsyncOperation.h" -#include "qpid/broker/AsyncStore.h" +//#include "qpid/broker/AsyncStore.h" #include "qpid/sys/PollableQueue.h" namespace qpid { diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp index 4f35e8cd2a..400237df75 100644 --- a/cpp/src/qpid/asyncStore/Plugin.cpp +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -23,6 +23,8 @@ #include "Plugin.h" +#include "AsyncStoreImpl.h" + #include "qpid/broker/Broker.h" namespace qpid { diff --git a/cpp/src/qpid/asyncStore/Plugin.h b/cpp/src/qpid/asyncStore/Plugin.h index 7cf500a122..7d2b67bdad 100644 --- a/cpp/src/qpid/asyncStore/Plugin.h +++ b/cpp/src/qpid/asyncStore/Plugin.h @@ -24,13 +24,15 @@ #ifndef qpid_broker_Plugin_h_ #define qpid_broker_Plugin_h_ -#include "AsyncStoreImpl.h" #include "AsyncStoreOptions.h" #include "qpid/Plugin.h" namespace qpid { class Options; +namespace asyncStore { +class AsyncStoreImpl; +} namespace broker { class Plugin : public qpid::Plugin diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp index 523a31ba7b..3781329b92 100644 --- a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp @@ -23,8 +23,6 @@ #include "QueueHandleImpl.h" -#include "qpid/messaging/PrivateImplRef.h" - namespace qpid { namespace asyncStore { diff --git a/cpp/src/qpid/asyncStore/RunState.h b/cpp/src/qpid/asyncStore/RunState.h index dfa331ea1a..832e922a04 100644 --- a/cpp/src/qpid/asyncStore/RunState.h +++ b/cpp/src/qpid/asyncStore/RunState.h @@ -59,7 +59,7 @@ typedef enum { RS_STOPPED } RunState_t; -class RunState: public qpid::asyncStore::jrnl2::State<RunState_t> +class RunState : public qpid::asyncStore::jrnl2::State<RunState_t> { public: RunState(); diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index c5371f161c..2b343e9517 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -25,7 +25,7 @@ #include "qpid/Exception.h" #include "qpid/broker/TxnBuffer.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "qpid/log/Statement.h" #include <uuid/uuid.h> @@ -117,7 +117,7 @@ TxnHandleImpl::createLocalXid() char uuidStr[37]; // 36-char uuid + trailing '\0' ::uuid_unparse(uuid, uuidStr); m_xid.assign(uuidStr); -//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush; + QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); } }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp index cdd2231977..d2fa9ae3e0 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.cpp +++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp @@ -23,21 +23,22 @@ #include "AsyncResultHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "AsyncResultHandleImpl.h" +#include "PrivateImplRef.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<AsyncResultHandle> PrivateImpl; +typedef PrivateImplRef<AsyncResultHandle> PrivateImpl; AsyncResultHandle::AsyncResultHandle(AsyncResultHandleImpl* p) : - qpid::messaging::Handle<AsyncResultHandleImpl>() + Handle<AsyncResultHandleImpl>() { PrivateImpl::ctor(*this, p); } AsyncResultHandle::AsyncResultHandle(const AsyncResultHandle& r) : - qpid::messaging::Handle<AsyncResultHandleImpl>() + Handle<AsyncResultHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h index f916bde5d3..cf0fea5a06 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.h +++ b/cpp/src/qpid/broker/AsyncResultHandle.h @@ -24,14 +24,17 @@ #ifndef qpid_broker_AsyncResultHandle_h_ #define qpid_broker_AsyncResultHandle_h_ -#include "AsyncResultHandleImpl.h" +#include "Handle.h" -#include "qpid/messaging/Handle.h" +#include <boost/shared_ptr.hpp> +#include <string> namespace qpid { namespace broker { +class AsyncResultHandleImpl; +class BrokerAsyncContext; -class AsyncResultHandle : public qpid::messaging::Handle<AsyncResultHandleImpl> +class AsyncResultHandle : public Handle<AsyncResultHandleImpl> { public: AsyncResultHandle(AsyncResultHandleImpl* p = 0); @@ -47,7 +50,7 @@ public: void invokeAsyncResultCallback() const; private: - friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>; + friend class PrivateImplRef<AsyncResultHandle>; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp index 62c7ed33d9..ab408be9ca 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -21,9 +21,12 @@ * \file AsyncResultQueueImpl.cpp */ -#include "AsyncResultHandle.h" #include "AsyncResultQueueImpl.h" +#include "AsyncResultHandle.h" + +#include "qpid/log/Statement.h" + namespace qpid { namespace broker { @@ -41,7 +44,6 @@ AsyncResultQueueImpl::~AsyncResultQueueImpl() void AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) { -//std::cout << "==> AsyncResultQueueImpl::submit() errNo=" << arh->getErrNo() << " errMsg=\"" << arh->getErrMsg() << "\"" << std::endl << std::flush; m_resQueue.push(arh); } @@ -51,15 +53,14 @@ AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) { try { for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; if ((*i)->isValid()) { (*i)->invokeAsyncResultCallback(); } } } catch (const std::exception& e) { - std::cerr << "qpid::broker::AsyncResultQueueImpl: Exception thrown processing async result: " << e.what() << std::endl; + QPID_LOG(error, "Exception thrown processing async result: " << e.what()); } catch (...) { - std::cerr << "qpid::broker::AsyncResultQueueImpl: Unknown exception thrown processing async result" << std::endl; + QPID_LOG(error, "Unknown exception thrown processing async result"); } return e.end(); } diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 2f73ec8f3e..7bb6175862 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -20,12 +20,10 @@ #ifndef qpid_broker_AsyncStore_h_ #define qpid_broker_AsyncStore_h_ -// TODO: See if we can replace this with a forward declaration, but current definition of qpid::types::Variant::Map -// does not allow it. Using a local map<std::string, Variant> definition also precludes forward declaration. #include "qpid/types/Variant.h" // qpid::types::Variant::Map #include <boost/shared_ptr.hpp> -#include <stdint.h> +#include <stdint.h> // uint64_t #include <string> namespace qpid { @@ -57,66 +55,91 @@ public: virtual void write(char* target) = 0; }; -// Callback invoked by AsyncResultQueue to pass back async results -typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); +// Opaque async handles used for carrying persistence state. class ConfigHandle; class EnqueueHandle; class EventHandle; class MessageHandle; class QueueHandle; -class TxnBuffer; class TxnHandle; -class AsyncTransactionalStore { +class TxnBuffer; + +class AsyncTransaction { public: - virtual ~AsyncTransactionalStore() {} + virtual ~AsyncTransaction() {} virtual TxnHandle createTxnHandle() = 0; virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; virtual TxnHandle createTxnHandle(const std::string& xid) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0; - - // TODO: Remove boost::shared_ptr<> from this interface - virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only - virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid, + TxnBuffer* tb) = 0; + + // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface + virtual void submitPrepare(TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only + virtual void submitCommit(TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitAbort(TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; }; // Subclassed by store: -class AsyncStore : public AsyncTransactionalStore { +class AsyncStore : public AsyncTransaction { public: virtual ~AsyncStore() {} // --- Factory methods for creating handles --- virtual ConfigHandle createConfigHandle() = 0; - virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0; - virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; + virtual EnqueueHandle createEnqueueHandle(MessageHandle&, + QueueHandle&) = 0; + virtual EventHandle createEventHandle(QueueHandle&, + const std::string& key=std::string()) = 0; virtual MessageHandle createMessageHandle(const DataSource* const) = 0; - virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; + virtual QueueHandle createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts) = 0; // --- Store async interface --- - // TODO: Remove boost::shared_ptr<> from this interface - virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - - virtual void submitCreate(QueueHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitFlush(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - - virtual void submitCreate(EventHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(EventHandle&, const DataSource* const, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(EventHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDestroy(EventHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - - virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitDequeue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface + virtual void submitCreate(ConfigHandle&, + const DataSource* const, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(ConfigHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + + virtual void submitCreate(QueueHandle&, + const DataSource* const, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(QueueHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitFlush(QueueHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + + virtual void submitCreate(EventHandle&, + const DataSource* const, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(EventHandle&, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + + virtual void submitEnqueue(EnqueueHandle&, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDequeue(EnqueueHandle&, + TxnHandle&, + boost::shared_ptr<BrokerAsyncContext>) = 0; // Legacy - Restore FTD message, is NOT async! - virtual int loadContent(MessageHandle&, QueueHandle&, char* data, uint64_t offset, const uint64_t length) = 0; + virtual int loadContent(MessageHandle&, + QueueHandle&, + char* data, + uint64_t offset, + const uint64_t length) = 0; }; diff --git a/cpp/src/qpid/broker/ConfigHandle.cpp b/cpp/src/qpid/broker/ConfigHandle.cpp index 13f7e7fa94..0bd65543ae 100644 --- a/cpp/src/qpid/broker/ConfigHandle.cpp +++ b/cpp/src/qpid/broker/ConfigHandle.cpp @@ -23,23 +23,23 @@ #include "ConfigHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/ConfigHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<ConfigHandle> PrivateImpl; +typedef PrivateImplRef<ConfigHandle> PrivateImpl; ConfigHandle::ConfigHandle(qpid::asyncStore::ConfigHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::ConfigHandleImpl>() { PrivateImpl::ctor(*this, p); } ConfigHandle::ConfigHandle(const ConfigHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::ConfigHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h index 52a9d672d5..6bcb3d8ce0 100644 --- a/cpp/src/qpid/broker/ConfigHandle.h +++ b/cpp/src/qpid/broker/ConfigHandle.h @@ -21,19 +21,21 @@ * \file ConfigHandle.h */ -#ifndef qpid_broker_ConfigHandleImpl_h_ -#define qpid_broker_ConfigHandleImpl_h_ +#ifndef qpid_broker_ConfigHandle_h_ +#define qpid_broker_ConfigHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/ConfigHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" namespace qpid { +namespace asyncStore { +class ConfigHandleImpl; +} namespace broker { -class ConfigHandle : public qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>, - public IdHandle +class ConfigHandle : public Handle<qpid::asyncStore::ConfigHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: ConfigHandle(qpid::asyncStore::ConfigHandleImpl* p = 0); @@ -45,9 +47,9 @@ public: // <none> private: - friend class qpid::messaging::PrivateImplRef<ConfigHandle>; + friend class PrivateImplRef<ConfigHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_ConfigHandleImpl_h_ +#endif // qpid_broker_ConfigHandle_h_ diff --git a/cpp/src/qpid/broker/EnqueueHandle.cpp b/cpp/src/qpid/broker/EnqueueHandle.cpp index 3b8e2d5b30..877eb680a6 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.cpp +++ b/cpp/src/qpid/broker/EnqueueHandle.cpp @@ -23,23 +23,23 @@ #include "EnqueueHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/EnqueueHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<EnqueueHandle> PrivateImpl; +typedef PrivateImplRef<EnqueueHandle> PrivateImpl; EnqueueHandle::EnqueueHandle(qpid::asyncStore::EnqueueHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EnqueueHandleImpl>() { PrivateImpl::ctor(*this, p); } EnqueueHandle::EnqueueHandle(const EnqueueHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EnqueueHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h index cdd07e246b..f869a755b1 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.h +++ b/cpp/src/qpid/broker/EnqueueHandle.h @@ -21,19 +21,21 @@ * \file EnqueueHandle.h */ -#ifndef qpid_broker_EnqueueHandleImpl_h_ -#define qpid_broker_EnqueueHandleImpl_h_ +#ifndef qpid_broker_EnqueueHandle_h_ +#define qpid_broker_EnqueueHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/EnqueueHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" namespace qpid { +namespace asyncStore { +class EnqueueHandleImpl; +} namespace broker { -class EnqueueHandle : public qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>, - public IdHandle +class EnqueueHandle : public Handle<qpid::asyncStore::EnqueueHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: EnqueueHandle(qpid::asyncStore::EnqueueHandleImpl* p = 0); @@ -45,9 +47,9 @@ public: // <none> private: - friend class qpid::messaging::PrivateImplRef<EnqueueHandle>; + friend class PrivateImplRef<EnqueueHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_EnqueueHandleImpl_h_ +#endif // qpid_broker_EnqueueHandle_h_ diff --git a/cpp/src/qpid/broker/EventHandle.cpp b/cpp/src/qpid/broker/EventHandle.cpp index 97d5920837..81f33b59a6 100644 --- a/cpp/src/qpid/broker/EventHandle.cpp +++ b/cpp/src/qpid/broker/EventHandle.cpp @@ -23,23 +23,23 @@ #include "EventHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/EventHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<EventHandle> PrivateImpl; +typedef PrivateImplRef<EventHandle> PrivateImpl; EventHandle::EventHandle(qpid::asyncStore::EventHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EventHandleImpl>() { PrivateImpl::ctor(*this, p); } EventHandle::EventHandle(const EventHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::EventHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h index 20e7773502..31f0e22dbf 100644 --- a/cpp/src/qpid/broker/EventHandle.h +++ b/cpp/src/qpid/broker/EventHandle.h @@ -21,19 +21,23 @@ * \file EventHandle.h */ -#ifndef qpid_broker_EventHandleImpl_h_ -#define qpid_broker_EventHandleImpl_h_ +#ifndef qpid_broker_EventHandle_h_ +#define qpid_broker_EventHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/EventHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" + +#include <string> namespace qpid { +namespace asyncStore { +class EventHandleImpl; +} namespace broker { -class EventHandle : public qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>, - public IdHandle +class EventHandle : public Handle<qpid::asyncStore::EventHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: EventHandle(qpid::asyncStore::EventHandleImpl* p = 0); @@ -45,9 +49,9 @@ public: const std::string& getKey() const; private: - friend class qpid::messaging::PrivateImplRef<EventHandle>; + friend class PrivateImplRef<EventHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_EventHandleImpl_h_ +#endif // qpid_broker_EventHandle_h_ diff --git a/cpp/src/qpid/broker/Handle.h b/cpp/src/qpid/broker/Handle.h new file mode 100644 index 0000000000..397f58f2e7 --- /dev/null +++ b/cpp/src/qpid/broker/Handle.h @@ -0,0 +1,83 @@ +#ifndef QPID_BROKER_HANDLE_H +#define QPID_BROKER_HANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * NOTE: This is a copy of qpid::messaging::Handle (but stripped of its + * messaging-specific Windows decoration macros) + * + * This (together with PrivateImplRef.h) has been placed here so + * as not to introduce unnecessary dependencies on qpid::messaging + * for users of the Handle template in the qpid::broker namespace. + * + * Any fixes made here should also be made to qpid/messaging/Handle.h + * + * TODO: Find the correct Windows decorations for these functions. + * TODO: Find (if possible) a way to eliminate two copies of the same code. + */ + +namespace qpid { +namespace broker { + +template <class> class PrivateImplRef; + +/** \ingroup messaging + * A handle is like a pointer: refers to an underlying implementation object. + * Copying the handle does not copy the object. + * + * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the + * conversion to bool to test for a null handle. + */ +template <class T> class Handle { + public: + + /**@return true if handle is valid, i.e. not null. */ + bool isValid() const { return impl; } + + /**@return true if handle is null. It is an error to call any function on a null handle. */ + bool isNull() const { return !impl; } + + /** Conversion to bool supports idiom if (handle) { handle->... } */ + operator bool() const { return impl; } + + /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */ + bool operator !() const { return !impl; } + + void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; } + + protected: + typedef T Impl; + Handle() :impl() {} + + // Not implemented,subclasses must implement. + Handle(const Handle&); + Handle& operator=(const Handle&); + + Impl* impl; + + friend class PrivateImplRef<T>; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_HANDLE_H*/ diff --git a/cpp/src/qpid/broker/IdHandle.cpp b/cpp/src/qpid/broker/IdHandle.cpp deleted file mode 100644 index ebb8f9a3c6..0000000000 --- a/cpp/src/qpid/broker/IdHandle.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file IdHandle.cpp - */ - -#include "IdHandle.h" - -namespace qpid { -namespace broker { - -IdHandle::~IdHandle() -{} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandle.cpp b/cpp/src/qpid/broker/MessageHandle.cpp index 9a7b631e8b..2727e74edc 100644 --- a/cpp/src/qpid/broker/MessageHandle.cpp +++ b/cpp/src/qpid/broker/MessageHandle.cpp @@ -23,22 +23,23 @@ #include "MessageHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/MessageHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<MessageHandle> PrivateImpl; +typedef PrivateImplRef<MessageHandle> PrivateImpl; MessageHandle::MessageHandle(qpid::asyncStore::MessageHandleImpl* p) : - IdHandle() + Handle<qpid::asyncStore::MessageHandleImpl>() { PrivateImpl::ctor(*this, p); } MessageHandle::MessageHandle(const MessageHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::MessageHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index 739c53f7d3..e0a68a8878 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -21,19 +21,21 @@ * \file MessageHandle.h */ -#ifndef qpid_broker_MessageHandleImpl_h_ -#define qpid_broker_MessageHandleImpl_h_ +#ifndef qpid_broker_MessageHandle_h_ +#define qpid_broker_MessageHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/MessageHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" namespace qpid { +namespace asyncStore { +class MessageHandleImpl; +} namespace broker { -class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, - public IdHandle +class MessageHandle : public Handle<qpid::asyncStore::MessageHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0); @@ -45,9 +47,9 @@ public: // <none> private: - friend class qpid::messaging::PrivateImplRef<MessageHandle>; + friend class PrivateImplRef<MessageHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_MessageHandleImpl_h_ +#endif // qpid_broker_MessageHandle_h_ diff --git a/cpp/src/qpid/broker/PrivateImplRef.h b/cpp/src/qpid/broker/PrivateImplRef.h new file mode 100644 index 0000000000..5932ab882b --- /dev/null +++ b/cpp/src/qpid/broker/PrivateImplRef.h @@ -0,0 +1,105 @@ +#ifndef QPID_BROKER_PRIVATEIMPLREF_H +#define QPID_BROKER_PRIVATEIMPLREF_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * NOTE: This is a copy of qpid::messaging::PrivateImplRef + * + * This (together with Handle.h) has been placed here so + * as not to introduce unnecessary dependencies on qpid::messaging + * for users of the Handle template in the qpid::broker namespace. + * + * Any fixes made here should also be made to qpid/messaging/PrivateImplRef.h + * + * TODO: Find (if possible) a way to eliminate two copies of the same code. + */ + +#include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> class PrivateImplRef; + * class FooImpl; + * + * Foo : public Handle<FooImpl> { + * public: + * Foo(FooImpl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int fooDo(); // and other Foo functions... + * + * private: + * typedef FooImpl Impl; + * Impl* impl; + * friend class PrivateImplRef<Foo>; + * + * === Foo.cpp + * + * typedef PrivateImplRef<Foo> PI; + * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::fooDo() { return impl->fooDo(); } + * + */ +template <class T> class PrivateImplRef { + public: + typedef typename T::Impl Impl; + typedef boost::intrusive_ptr<Impl> intrusive_ptr; + + /** Get the implementation pointer from a handle */ + static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const intrusive_ptr& p) { + if (t.impl == p) return; + if (t.impl) boost::intrusive_ptr_release(t.impl); + t.impl = p.get(); + if (t.impl) boost::intrusive_ptr_add_ref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PRIVATEIMPLREF_H*/ diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp index 780eb6395c..5a5678df5a 100644 --- a/cpp/src/qpid/broker/QueueHandle.cpp +++ b/cpp/src/qpid/broker/QueueHandle.cpp @@ -23,22 +23,23 @@ #include "QueueHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/QueueHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<QueueHandle> PrivateImpl; +typedef PrivateImplRef<QueueHandle> PrivateImpl; QueueHandle::QueueHandle(qpid::asyncStore::QueueHandleImpl* p) : - IdHandle() + Handle<qpid::asyncStore::QueueHandleImpl>() { PrivateImpl::ctor(*this, p); } QueueHandle::QueueHandle(const QueueHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::QueueHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::QueueHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h index cb366e2880..234c5e15e8 100644 --- a/cpp/src/qpid/broker/QueueHandle.h +++ b/cpp/src/qpid/broker/QueueHandle.h @@ -21,18 +21,23 @@ * \file QueueHandle.h */ -#ifndef qpid_broker_QueueHandleImpl_h_ -#define qpid_broker_QueueHandleImpl_h_ +#ifndef qpid_broker_QueueHandle_h_ +#define qpid_broker_QueueHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/QueueHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" + +#include <string> namespace qpid { +namespace asyncStore { +class QueueHandleImpl; +} namespace broker { -class QueueHandle : public qpid::messaging::Handle<qpid::asyncStore::QueueHandleImpl>, public IdHandle +class QueueHandle : public Handle<qpid::asyncStore::QueueHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: QueueHandle(qpid::asyncStore::QueueHandleImpl* p = 0); @@ -44,9 +49,9 @@ public: const std::string& getName() const; private: - friend class qpid::messaging::PrivateImplRef<QueueHandle>; + friend class PrivateImplRef<QueueHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_QueueHandleImpl_h_ +#endif // qpid_broker_QueueHandle_h_ diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index e8abe99dab..c3d4342993 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -23,8 +23,6 @@ #include "TxnAsyncContext.h" -#include <cassert> - namespace qpid { namespace broker { @@ -38,9 +36,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, m_op(op), m_rcb(rcb), m_arq(arq) -{ - assert(m_th.isValid()); -} +{} TxnAsyncContext::~TxnAsyncContext() {} @@ -63,7 +59,7 @@ TxnAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -TxnHandle +TxnHandle& TxnAsyncContext::getTransactionContext() const { return m_th; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 9bdd8f7188..810c46429c 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -25,15 +25,16 @@ #define qpid_broker_TxnAsyncContext_h_ #include "AsyncStore.h" // qpid::broker::BrokerAsyncContext -#include "TxnHandle.h" #include "qpid/asyncStore/AsyncOperation.h" -#include <boost/shared_ptr.hpp> - namespace qpid { namespace broker { +class TxnHandle; + +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + class TxnAsyncContext: public BrokerAsyncContext { public: @@ -46,7 +47,7 @@ public: TxnBuffer* getTxnBuffer() const; qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - TxnHandle getTransactionContext() const; + TxnHandle& getTransactionContext() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; @@ -54,7 +55,7 @@ public: private: TxnBuffer* const m_tb; - TxnHandle m_th; + TxnHandle& m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index b975f09448..425d725e9e 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -24,13 +24,10 @@ #include "TxnBuffer.h" #include "AsyncResultHandle.h" -#include "AsyncStore.h" #include "TxnAsyncContext.h" #include "TxnOp.h" -#include "qpid/Exception.h" - -#include <boost/shared_ptr.hpp> +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -47,7 +44,6 @@ TxnBuffer::~TxnBuffer() void TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) { -//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); m_ops.push_back(op); } @@ -55,7 +51,6 @@ TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) bool TxnBuffer::prepare(TxnHandle& th) { -//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { if (!(*i)->prepare(th)) { @@ -68,7 +63,6 @@ TxnBuffer::prepare(TxnHandle& th) void TxnBuffer::commit() { -//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->commit(); @@ -79,7 +73,6 @@ TxnBuffer::commit() void TxnBuffer::rollback() { -//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush; qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->rollback(); @@ -88,17 +81,16 @@ TxnBuffer::rollback() } bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) +TxnBuffer::commitLocal(AsyncTransaction* const store) { -//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush; if (store) { try { m_store = store; asyncLocalCommit(); } catch (std::exception& e) { - std::cerr << "Commit failed: " << e.what() << std::endl; + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); } catch (...) { - std::cerr << "Commit failed (unknown exception)" << std::endl; + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); } } return false; @@ -111,11 +103,10 @@ TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { - std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); tac->getTxnBuffer()->asyncLocalAbort(); } else { -//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush; if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { tac->getTxnBuffer()->asyncLocalAbort(); } else { @@ -131,13 +122,11 @@ TxnBuffer::asyncLocalCommit() assert(m_store != 0); switch(m_state) { case NONE: -//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush; m_state = PREPARE; m_txnHandle = m_store->createTxnHandle(this); prepare(m_txnHandle); break; case PREPARE: -//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush; m_state = COMMIT; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, @@ -149,16 +138,12 @@ TxnBuffer::asyncLocalCommit() } break; case COMMIT: -//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush; commit(); m_state = COMPLETE; delete this; // TODO: ugly! Find a better way to handle the life cycle of this class break; -// case COMPLETE: -//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl << std::flush; - break; + case COMPLETE: default: ; -//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush; } } @@ -170,7 +155,6 @@ TxnBuffer::asyncLocalAbort() case NONE: case PREPARE: case COMMIT: -//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush; m_state = ROLLBACK; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, @@ -182,31 +166,11 @@ TxnBuffer::asyncLocalAbort() } break; case ROLLBACK: -//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush; rollback(); m_state = COMPLETE; delete this; // TODO: ugly! Find a better way to handle the life cycle of this class default: ; -//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush; - } -} - -// for debugging -/* -void -TxnBuffer::printState(std::ostream& os) -{ - os << "state="; - switch(m_state) { - case NONE: os << "NONE"; break; - case PREPARE: os << "PREPARE"; break; - case COMMIT: os << "COMMIT"; break; - case ROLLBACK: os << "ROLLBACK"; break; - case COMPLETE: os << "COMPLETE"; break; - default: os << m_state << "(unknown)"; } - os << "; " << m_ops.size() << "; store=" << m_store; } -*/ }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h index ea8b73407f..cd78846b71 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -26,7 +26,8 @@ #include "TxnHandle.h" -//#include <boost/enable_shared_from_this.hpp> +#include "qpid/sys/Mutex.h" + #include <boost/shared_ptr.hpp> #include <vector> @@ -35,10 +36,10 @@ namespace broker { class AsyncResultHandle; class AsyncResultQueue; -class AsyncTransactionalStore; +class AsyncTransaction; class TxnOp; -class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ { +class TxnBuffer { public: TxnBuffer(AsyncResultQueue& arq); virtual ~TxnBuffer(); @@ -47,21 +48,18 @@ public: bool prepare(TxnHandle& th); void commit(); void rollback(); - bool commitLocal(AsyncTransactionalStore* const store); + bool commitLocal(AsyncTransaction* const store); // --- Async operations --- static void handleAsyncResult(const AsyncResultHandle* const arh); void asyncLocalCommit(); void asyncLocalAbort(); - // --- Debug --- - //void printState(std::ostream& os); - private: std::vector<boost::shared_ptr<TxnOp> > m_ops; qpid::sys::Mutex m_opsMutex; TxnHandle m_txnHandle; - AsyncTransactionalStore* m_store; + AsyncTransaction* m_store; AsyncResultQueue& m_resultQueue; typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp index 07d46b4235..58cedd586e 100644 --- a/cpp/src/qpid/broker/TxnHandle.cpp +++ b/cpp/src/qpid/broker/TxnHandle.cpp @@ -23,23 +23,23 @@ #include "TxnHandle.h" -#include "qpid/messaging/PrivateImplRef.h" +#include "PrivateImplRef.h" + +#include "qpid/asyncStore/TxnHandleImpl.h" namespace qpid { namespace broker { -typedef qpid::messaging::PrivateImplRef<TxnHandle> PrivateImpl; +typedef PrivateImplRef<TxnHandle> PrivateImpl; TxnHandle::TxnHandle(qpid::asyncStore::TxnHandleImpl* p) : - qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::TxnHandleImpl>() { PrivateImpl::ctor(*this, p); } TxnHandle::TxnHandle(const TxnHandle& r) : - qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>(), - IdHandle() + Handle<qpid::asyncStore::TxnHandleImpl>() { PrivateImpl::copy(*this, r); } diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 8bed14e16a..7302490939 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -21,19 +21,23 @@ * \file TxnHandle.h */ -#ifndef qpid_broker_TxnHandleImpl_h_ -#define qpid_broker_TxnHandleImpl_h_ +#ifndef qpid_broker_TxnHandle_h_ +#define qpid_broker_TxnHandle_h_ -#include "IdHandle.h" +#include "Handle.h" -#include "qpid/asyncStore/TxnHandleImpl.h" -#include "qpid/messaging/Handle.h" +#include "qpid/asyncStore/AsyncStoreHandle.h" + +#include <string> namespace qpid { +namespace asyncStore { +class TxnHandleImpl; +} namespace broker { -class TxnHandle : public qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>, - public IdHandle +class TxnHandle : public Handle<qpid::asyncStore::TxnHandleImpl>, + public qpid::asyncStore::AsyncStoreHandle { public: TxnHandle(qpid::asyncStore::TxnHandleImpl* p = 0); @@ -48,9 +52,9 @@ public: void decrOpCnt(); private: - friend class qpid::messaging::PrivateImplRef<TxnHandle>; + friend class PrivateImplRef<TxnHandle>; }; }} // namespace qpid::broker -#endif // qpid_broker_TxnHandleImpl_h_ +#endif // qpid_broker_TxnHandle_h_ diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index ad9a518867..637442e128 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -344,78 +344,5 @@ add_library (dlclose_noop MODULE dlclose_noop.c) #check-long: # $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= - -# Async Store perf tests -# ---------------------- - -# New journal perf test (jrnl2Perf) -set (jrnl2Perf_SOURCES - storePerftools/jrnlPerf/Journal.cpp - storePerftools/jrnlPerf/JournalParameters.cpp - storePerftools/jrnlPerf/PerfTest.cpp - storePerftools/jrnlPerf/TestResult.cpp - - storePerftools/common/Parameters.cpp - storePerftools/common/PerftoolError.cpp - storePerftools/common/ScopedTimable.cpp - storePerftools/common/ScopedTimer.cpp - storePerftools/common/Streamable.cpp - storePerftools/common/TestParameters.cpp - storePerftools/common/TestResult.cpp - storePerftools/common/Thread.cpp -) - -if (UNIX) - add_executable (jrnl2Perf ${jrnl2Perf_SOURCES}) - set_target_properties (jrnl2Perf PROPERTIES - COMPILE_FLAGS "-DJOURNAL2" - ) - target_link_libraries (jrnl2Perf - asyncStore - qpidbroker - rt - ) -endif (UNIX) - -# Async store perf test (asyncPerf) -set (asyncStorePerf_SOURCES - storePerftools/asyncPerf/Deliverable.cpp - storePerftools/asyncPerf/DeliveryRecord.cpp - storePerftools/asyncPerf/MessageAsyncContext.cpp - storePerftools/asyncPerf/MessageConsumer.cpp - storePerftools/asyncPerf/MessageDeque.cpp - storePerftools/asyncPerf/MessageProducer.cpp - storePerftools/asyncPerf/PerfTest.cpp - storePerftools/asyncPerf/QueueAsyncContext.cpp - storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimpleMessage.cpp - storePerftools/asyncPerf/SimpleQueue.cpp - storePerftools/asyncPerf/TestOptions.cpp - storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TxnAccept.cpp - storePerftools/asyncPerf/TxnPublish.cpp - - storePerftools/common/Parameters.cpp - storePerftools/common/PerftoolError.cpp - storePerftools/common/ScopedTimable.cpp - storePerftools/common/ScopedTimer.cpp - storePerftools/common/Streamable.cpp - storePerftools/common/TestOptions.cpp - storePerftools/common/TestResult.cpp - storePerftools/common/Thread.cpp -) - -if (UNIX) - add_executable (asyncStorePerf ${asyncStorePerf_SOURCES}) - set_target_properties (asyncStorePerf PROPERTIES - COMPILE_FLAGS "-DJOURNAL2" - ) - target_link_libraries (asyncStorePerf - boost_program_options - asyncStore - qpidbroker - qpidcommon - qpidtypes - rt - ) -endif (UNIX) +# Include async store tests +include(asyncstore.cmake) diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake new file mode 100644 index 0000000000..cd20394908 --- /dev/null +++ b/cpp/src/tests/asyncstore.cmake @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# +# Async store test CMake fragment, to be included in tests/CMakeLists.txt +# + +# New journal perf test (jrnl2Perf) +set (jrnl2Perf_SOURCES + storePerftools/jrnlPerf/Journal.cpp + storePerftools/jrnlPerf/JournalParameters.cpp + storePerftools/jrnlPerf/PerfTest.cpp + storePerftools/jrnlPerf/TestResult.cpp + + storePerftools/common/Parameters.cpp + storePerftools/common/PerftoolError.cpp + storePerftools/common/ScopedTimable.cpp + storePerftools/common/ScopedTimer.cpp + storePerftools/common/Streamable.cpp + storePerftools/common/TestParameters.cpp + storePerftools/common/TestResult.cpp + storePerftools/common/Thread.cpp +) + +if (UNIX) + add_executable (jrnl2Perf ${jrnl2Perf_SOURCES}) + set_target_properties (jrnl2Perf PROPERTIES + COMPILE_FLAGS "-DJOURNAL2" + ) + target_link_libraries (jrnl2Perf + asyncStore + qpidbroker + rt + ) +endif (UNIX) + +# Async store perf test (asyncPerf) +set (asyncStorePerf_SOURCES + storePerftools/asyncPerf/Deliverable.cpp + storePerftools/asyncPerf/DeliveryRecord.cpp + storePerftools/asyncPerf/MessageAsyncContext.cpp + storePerftools/asyncPerf/MessageConsumer.cpp + storePerftools/asyncPerf/MessageDeque.cpp + storePerftools/asyncPerf/MessageProducer.cpp + storePerftools/asyncPerf/PerfTest.cpp + storePerftools/asyncPerf/PersistableQueuedMessage.cpp + storePerftools/asyncPerf/QueueAsyncContext.cpp + storePerftools/asyncPerf/QueuedMessage.cpp + storePerftools/asyncPerf/SimpleMessage.cpp + storePerftools/asyncPerf/SimpleQueue.cpp + storePerftools/asyncPerf/TestOptions.cpp + storePerftools/asyncPerf/TestResult.cpp + storePerftools/asyncPerf/TxnAccept.cpp + storePerftools/asyncPerf/TxnPublish.cpp + + storePerftools/common/Parameters.cpp + storePerftools/common/PerftoolError.cpp + storePerftools/common/ScopedTimable.cpp + storePerftools/common/ScopedTimer.cpp + storePerftools/common/Streamable.cpp + storePerftools/common/TestOptions.cpp + storePerftools/common/TestResult.cpp + storePerftools/common/Thread.cpp +) + +if (UNIX) + add_executable (asyncStorePerf ${asyncStorePerf_SOURCES}) + set_target_properties (asyncStorePerf PROPERTIES + COMPILE_FLAGS "-DJOURNAL2" + ) + target_link_libraries (asyncStorePerf + boost_program_options + asyncStore + qpidbroker + qpidcommon + qpidtypes + rt + ) +endif (UNIX) diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp index b7250ecf40..1728a2dc1e 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -31,7 +31,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -DeliveryRecord::DeliveryRecord(const QueuedMessage& qm, +DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, MessageConsumer& mc, bool accepted) : m_queuedMessage(qm), @@ -47,7 +47,7 @@ bool DeliveryRecord::accept() { if (!m_ended) { - m_queuedMessage.getQueue()->dequeue(m_queuedMessage); + m_queuedMessage->getQueue()->dequeue(m_queuedMessage); m_accepted = true; setEnded(); } @@ -64,7 +64,7 @@ bool DeliveryRecord::setEnded() { m_ended = true; - m_queuedMessage.payload() = boost::intrusive_ptr<SimpleMessage>(0); + m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0); return isRedundant(); } @@ -83,7 +83,7 @@ DeliveryRecord::isRedundant() const void DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn) { - m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage); + m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage); } void @@ -92,7 +92,7 @@ DeliveryRecord::committed() const m_msgConsumer.commitComplete(); } -QueuedMessage +boost::shared_ptr<QueuedMessage> DeliveryRecord::getQueuedMessage() const { return m_queuedMessage; diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h index 427cf846f0..bb89787737 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -26,6 +26,8 @@ #include "QueuedMessage.h" +#include <boost/shared_ptr.hpp> + namespace qpid { namespace broker { class TxnHandle; @@ -39,7 +41,7 @@ class MessageConsumer; class DeliveryRecord { public: - DeliveryRecord(const QueuedMessage& qm, + DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, MessageConsumer& mc, bool accepted); virtual ~DeliveryRecord(); @@ -50,9 +52,9 @@ public: bool isRedundant() const; void dequeue(qpid::broker::TxnHandle& txn); void committed() const; - QueuedMessage getQueuedMessage() const; + boost::shared_ptr<QueuedMessage> getQueuedMessage() const; private: - QueuedMessage m_queuedMessage; + boost::shared_ptr<QueuedMessage> m_queuedMessage; MessageConsumer& m_msgConsumer; bool m_accepted : 1; bool m_ended : 1; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp index c61ce352a1..8b79a91ac1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp @@ -42,7 +42,7 @@ MessageDeque::size() } bool -MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/) +MessageDeque::push(boost::shared_ptr<QueuedMessage>& added) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); m_messages.push_back(added); @@ -50,7 +50,7 @@ MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/) } bool -MessageDeque::consume(QueuedMessage& msg) +MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); if (!m_messages.empty()) { diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h index 75f422779e..021015f3e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h @@ -46,10 +46,10 @@ public: MessageDeque(); virtual ~MessageDeque(); uint32_t size(); - bool push(const QueuedMessage& added, QueuedMessage& removed); - bool consume(QueuedMessage& msg); + bool push(boost::shared_ptr<QueuedMessage>& added); + bool consume(boost::shared_ptr<QueuedMessage>& msg); private: - std::deque<QueuedMessage> m_messages; + std::deque<boost::shared_ptr<QueuedMessage> > m_messages; qpid::sys::Mutex m_msgMutex; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h index 9b5bd0be99..c1bfa328ea 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h +++ b/cpp/src/tests/storePerftools/asyncPerf/Messages.h @@ -30,6 +30,7 @@ #ifndef tests_storePerftools_asyncPerf_Messages_h_ #define tests_storePerftools_asyncPerf_Messages_h_ +#include <boost/shared_ptr.hpp> #include <stdint.h> namespace tests { @@ -43,8 +44,8 @@ class Messages public: virtual ~Messages() {} virtual uint32_t size() = 0; - virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; - virtual bool consume(QueuedMessage& msg) = 0; + virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0; + virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 4d145d321d..1497b678a0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -31,7 +31,10 @@ #include "tests/storePerftools/common/ScopedTimer.h" #include "tests/storePerftools/common/Thread.h" +#include "qpid/Modules.h" // Use with loading store as module #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/asyncStore/AsyncStoreOptions.h" +#include "qpid/broker/AsyncStore.h" #include "qpid/sys/Poller.h" #include <iomanip> @@ -161,16 +164,15 @@ PerfTest::destroyQueues() } } -}}} // namespace tests::storePerftools::asyncPerf - -// ----------------------------------------------------------------- - int -main(int argc, char** argv) +runPerfTest(int argc, char** argv) { + // Load async store module + qpid::tryShlib ("asyncStore.so", false); + qpid::CommonOptions co; qpid::asyncStore::AsyncStoreOptions aso; - tests::storePerftools::asyncPerf::TestOptions to; + TestOptions to; qpid::Options opts; opts.add(co).add(aso).add(to); try { @@ -203,5 +205,16 @@ main(int argc, char** argv) // Print test result std::cout << apt << std::endl; //::sleep(1); + return 0; } + +}}} // namespace tests::storePerftools::asyncPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + return tests::storePerftools::asyncPerf::runPerfTest(argc, argv); +} diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 6cdf015f76..7cbb71322f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -36,6 +36,9 @@ #include <deque> namespace qpid { +namespace broker { +class AsyncStore; +} namespace asyncStore { class AsyncStoreImpl; class AsyncStoreOptions; @@ -83,6 +86,8 @@ private: }; +int runPerfTest(int argc, char** argv); + }}} // namespace tests::storePerftools::asyncPerf #endif // tests_storePerftools_asyncPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp new file mode 100644 index 0000000000..2eba7d5be5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp @@ -0,0 +1,52 @@ +#include "PersistableQueuedMessage.h" + +#include "SimpleQueue.h" +#include "SimpleMessage.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +PersistableQueuedMessage::PersistableQueuedMessage() +{} + +PersistableQueuedMessage::PersistableQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg) : + QueuedMessage(q, msg), + m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle())) +{} + +PersistableQueuedMessage::PersistableQueuedMessage(const PersistableQueuedMessage& pm) : + QueuedMessage(pm), + m_enqHandle(pm.m_enqHandle) +{} + +PersistableQueuedMessage::PersistableQueuedMessage(PersistableQueuedMessage* const pm) : + QueuedMessage(pm), + m_enqHandle(pm->m_enqHandle) +{} + +PersistableQueuedMessage::~PersistableQueuedMessage() +{} + +PersistableQueuedMessage& +PersistableQueuedMessage::operator=(const PersistableQueuedMessage& rhs) +{ + QueuedMessage::operator=(rhs); + m_enqHandle = rhs.m_enqHandle; + return *this; +} + +const qpid::broker::EnqueueHandle& +PersistableQueuedMessage::enqHandle() const +{ + return m_enqHandle; +} + +qpid::broker::EnqueueHandle& +PersistableQueuedMessage::enqHandle() +{ + return m_enqHandle; +} + +}}} // tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h new file mode 100644 index 0000000000..1e9446aa57 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h @@ -0,0 +1,31 @@ +#ifndef tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ +#define tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ + +#include "QueuedMessage.h" + +#include "qpid/broker/EnqueueHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class PersistableQueuedMessage : public QueuedMessage { +public: + PersistableQueuedMessage(); + PersistableQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg); + PersistableQueuedMessage(const PersistableQueuedMessage& pqm); + PersistableQueuedMessage(PersistableQueuedMessage* const pqm); + virtual ~PersistableQueuedMessage(); + PersistableQueuedMessage& operator=(const PersistableQueuedMessage& rhs); + + const qpid::broker::EnqueueHandle& enqHandle() const; + qpid::broker::EnqueueHandle& enqHandle(); + +private: + qpid::broker::EnqueueHandle m_enqHandle; +}; + +}}} // tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index 112a5ab1dd..3a8850c699 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -31,6 +31,10 @@ #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> +namespace qpid { +namespace broker { +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); +}} namespace tests { namespace storePerftools { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 11af7c9466..a733a96171 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -38,28 +38,25 @@ QueuedMessage::QueuedMessage() : QueuedMessage::QueuedMessage(SimpleQueue* q, boost::intrusive_ptr<SimpleMessage> msg) : + boost::enable_shared_from_this<QueuedMessage>(), m_queue(q), - m_msg(msg), - m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) + m_msg(msg) {} QueuedMessage::QueuedMessage(const QueuedMessage& qm) : + boost::enable_shared_from_this<QueuedMessage>(), m_queue(qm.m_queue), - m_msg(qm.m_msg), - m_enqHandle(qm.m_enqHandle) + m_msg(qm.m_msg) {} -QueuedMessage::~QueuedMessage() +QueuedMessage::QueuedMessage(QueuedMessage* const qm) : + boost::enable_shared_from_this<QueuedMessage>(), + m_queue(qm->m_queue), + m_msg(qm->m_msg) {} -QueuedMessage& -QueuedMessage::operator=(const QueuedMessage& rhs) -{ - m_queue = rhs.m_queue; - m_msg = rhs.m_msg; - m_enqHandle = rhs.m_enqHandle; - return *this; -} +QueuedMessage::~QueuedMessage() +{} SimpleQueue* QueuedMessage::getQueue() const @@ -73,22 +70,10 @@ QueuedMessage::payload() const return m_msg; } -const qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() const -{ - return m_enqHandle; -} - -qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() -{ - return m_enqHandle; -} - void QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) { - m_queue->enqueue(th, *this); + m_queue->enqueue(th, shared_from_this()); } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index 12c8e4da08..7d4e5bbbe4 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -24,8 +24,9 @@ #ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ #define tests_storePerftools_asyncPerf_QueuedMessage_h_ -#include "qpid/broker/EnqueueHandle.h" +#include "qpid/broker/AsyncStore.h" +#include <boost/enable_shared_from_this.hpp> #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -42,19 +43,17 @@ namespace asyncPerf { class SimpleMessage; class SimpleQueue; -class QueuedMessage +class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage> { public: QueuedMessage(); QueuedMessage(SimpleQueue* q, boost::intrusive_ptr<SimpleMessage> msg); QueuedMessage(const QueuedMessage& qm); - ~QueuedMessage(); - QueuedMessage& operator=(const QueuedMessage& rhs); + QueuedMessage(QueuedMessage* const qm); + virtual ~QueuedMessage(); SimpleQueue* getQueue() const; boost::intrusive_ptr<SimpleMessage> payload() const; - const qpid::broker::EnqueueHandle& enqHandle() const; - qpid::broker::EnqueueHandle& enqHandle(); // -- Transaction handling --- void prepareEnqueue(qpid::broker::TxnHandle& th); @@ -64,7 +63,6 @@ public: private: SimpleQueue* m_queue; boost::intrusive_ptr<SimpleMessage> m_msg; - qpid::broker::EnqueueHandle m_enqHandle; }; }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp index 29db6ceaf2..889f7a4cdd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp @@ -30,11 +30,20 @@ namespace storePerftools { namespace asyncPerf { SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast<size_t>(msgSize)), + m_store(0), + m_msgHandle(qpid::broker::MessageHandle()) +{} + +SimpleMessage::SimpleMessage(const char* msgData, const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store) : + qpid::broker::AsyncStore* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) + m_store(store), + m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle()) {} SimpleMessage::~SimpleMessage() @@ -95,7 +104,7 @@ SimpleMessage::encodedHeaderSize() const bool SimpleMessage::isPersistent() const { - return m_msgHandle.isValid(); + return m_store != 0; } uint64_t diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h index 1b3e034814..01f54c1c19 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h @@ -28,13 +28,6 @@ #include "qpid/broker/MessageHandle.h" #include "qpid/broker/PersistableMessage.h" -#include <set> - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -}} - namespace tests { namespace storePerftools { namespace asyncPerf { @@ -46,8 +39,10 @@ class SimpleMessage: public qpid::broker::PersistableMessage, { public: SimpleMessage(const char* msgData, + const uint32_t msgSize); + SimpleMessage(const char* msgData, const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store); + qpid::broker::AsyncStore* store); virtual ~SimpleMessage(); const qpid::broker::MessageHandle& getHandle() const; qpid::broker::MessageHandle& getHandle(); @@ -71,6 +66,8 @@ public: private: mutable uint64_t m_persistenceId; const std::string m_msg; + qpid::broker::AsyncStore* m_store; + qpid::broker::MessageHandle m_msgHandle; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 8bb79367ed..3bce2fb52a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -26,13 +26,15 @@ #include "DeliveryRecord.h" #include "MessageConsumer.h" #include "MessageDeque.h" -#include "SimpleMessage.h" +#include "PersistableQueuedMessage.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" +#include "SimpleMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/TxnHandle.h" + +#include <boost/make_shared.hpp> namespace tests { namespace storePerftools { @@ -44,7 +46,7 @@ qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operat SimpleQueue::SimpleQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq) : qpid::broker::PersistableQueue(), m_name(name), @@ -117,7 +119,7 @@ SimpleQueue::getHandle() return m_queueHandle; } -qpid::asyncStore::AsyncStoreImpl* +qpid::broker::AsyncStore* SimpleQueue::getStore() { return m_store; @@ -161,15 +163,24 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { - QueuedMessage qm(this, msg); + boost::shared_ptr<QueuedMessage> qm; + if (msg->isPersistent() && m_store) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + } +//boost::shared_ptr<PersistableQueuedMessage> pqm1 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm1.get()); enqueue(s_nullTxnHandle, qm); +//boost::shared_ptr<PersistableQueuedMessage> pqm2 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm2.get()); push(qm); } bool SimpleQueue::dispatch(MessageConsumer& mc) { - QueuedMessage qm; + boost::shared_ptr<QueuedMessage> qm; if (m_messages->consume(qm)) { boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); mc.record(dr); @@ -179,43 +190,47 @@ SimpleQueue::dispatch(MessageConsumer& mc) } bool -SimpleQueue::enqueue(QueuedMessage& qm) +SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { return enqueue(s_nullTxnHandle, qm); } bool SimpleQueue::enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); } return false; } bool -SimpleQueue::dequeue(QueuedMessage& qm) +SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { return dequeue(s_nullTxnHandle, qm); } bool SimpleQueue::dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->dequeueAsync(shared_from_this(), m_store); +//assert(qm.get()); +//boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm.get()); +//return asyncDequeue(th, pqm); + return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); } return true; } @@ -223,7 +238,12 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, void SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { - QueuedMessage qm(this, msg); + boost::shared_ptr<QueuedMessage> qm; + if (msg->isPersistent() && m_store) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + } push(qm); } @@ -343,11 +363,13 @@ SimpleQueue::ScopedUse::~ScopedUse() // private void -SimpleQueue::push(QueuedMessage& qm, +SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, bool /*isRecovery*/) { - QueuedMessage removed; - m_messages->push(qm, removed); +boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +assert(pqm.get()); + + m_messages->push(qm); } // --- End Members & methods in msg handling path from qpid::Queue --- @@ -355,20 +377,22 @@ SimpleQueue::push(QueuedMessage& qm, // private bool SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<PersistableQueuedMessage> pqm) { - qm.payload()->setPersistenceId(m_store->getNextRid()); -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(pqm.get()); +// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << pqm->payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), + pqm->payload(), th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); + // TODO : This must be done from inside store, not here if (th.isValid()) { th.incrOpCnt(); } - m_store->submitEnqueue(qm.enqHandle(), + m_store->submitEnqueue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; @@ -378,19 +402,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<PersistableQueuedMessage> pqm) { + assert(pqm.get()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), + pqm->payload(), th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); + // TODO : This must be done from inside store, not here if (th.isValid()) { th.incrOpCnt(); } - m_store->submitDequeue(qm.enqHandle(), + m_store->submitDequeue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; @@ -445,6 +471,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) --m_asyncOpCounter; qpid::broker::TxnHandle th = qc->getTxnHandle(); + + // TODO : This must be done from inside store, not here if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } @@ -459,6 +487,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) --m_asyncOpCounter; qpid::broker::TxnHandle th = qc->getTxnHandle(); + + // TODO : This must be done from inside store, not here if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index 59e12b5c93..81ea8b022b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -40,7 +40,6 @@ class AsyncStoreImpl; } namespace broker { class AsyncResultQueue; -class TxnHandle; } namespace framing { class FieldTable; @@ -52,9 +51,10 @@ namespace asyncPerf { class MessageConsumer; class Messages; -class SimpleMessage; +class PersistableQueuedMessage; class QueueAsyncContext; class QueuedMessage; +class SimpleMessage; class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, public qpid::broker::PersistableQueue, @@ -63,14 +63,14 @@ class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, public: SimpleQueue(const std::string& name, const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq); virtual ~SimpleQueue(); static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); - qpid::asyncStore::AsyncStoreImpl* getStore(); + qpid::broker::AsyncStore* getStore(); void asyncCreate(); void asyncDestroy(const bool deleteQueue); @@ -78,12 +78,12 @@ public: // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimpleMessage> msg); bool dispatch(MessageConsumer& mc); - bool enqueue(QueuedMessage& qm); + bool enqueue(boost::shared_ptr<QueuedMessage> qm); bool enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); - bool dequeue(QueuedMessage& qm); + boost::shared_ptr<QueuedMessage> qm); + bool dequeue(boost::shared_ptr<QueuedMessage> qm); bool dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); + boost::shared_ptr<QueuedMessage> qm); void process(boost::intrusive_ptr<SimpleMessage> msg); void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); @@ -106,9 +106,9 @@ private: static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations const std::string m_name; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! mutable uint64_t m_persistenceId; std::string m_persistableData; qpid::broker::QueueHandle m_queueHandle; @@ -135,14 +135,14 @@ private: }; UsageBarrier m_barrier; std::auto_ptr<Messages> m_messages; - void push(QueuedMessage& qm, + void push(boost::shared_ptr<QueuedMessage> qm, bool isRecovery = false); // -- Async ops --- bool asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); + boost::shared_ptr<PersistableQueuedMessage> pqm); bool asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); + boost::shared_ptr<PersistableQueuedMessage> pqm); // --- Async op counter --- void destroyCheck(const std::string& opDescr) const; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp index 3c5b99b5d5..7bede50272 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -25,6 +25,8 @@ #include "DeliveryRecord.h" +#include "qpid/log/Statement.h" + namespace tests { namespace storePerftools { namespace asyncPerf { @@ -41,15 +43,14 @@ TxnAccept::~TxnAccept() bool TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() { -//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush; try { for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->dequeue(th); } } catch (const std::exception& e) { - std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); } catch (...) { - std::cerr << "TxnAccept: Failed to prepare transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); } return false; } @@ -57,23 +58,20 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() void TxnAccept::commit() throw() { -//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush; try { for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { (*i)->committed(); (*i)->setEnded(); } } catch (const std::exception& e) { - std::cerr << "TxnAccept: Failed to commit transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); } catch(...) { - std::cerr << "TxnAccept: Failed to commit transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); } } void TxnAccept::rollback() throw() -{ -//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush; -} +{} }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index 10e48bef82..0c34520d06 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -21,11 +21,16 @@ * \file TxnPublish.cpp */ -#include "SimpleMessage.h" -#include "SimpleQueue.h" // debug msg #include "TxnPublish.h" +#include "PersistableQueuedMessage.h" #include "QueuedMessage.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" // debug msg + +#include "qpid/log/Statement.h" + +#include <boost/make_shared.hpp> namespace tests { namespace storePerftools { @@ -33,9 +38,7 @@ namespace asyncPerf { TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : m_msg(msg) -{ -//std::cout << "TTT new TxnPublish" << std::endl << std::flush; -} +{} TxnPublish::~TxnPublish() {} @@ -43,7 +46,6 @@ TxnPublish::~TxnPublish() bool TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() { -//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush; try{ while (!m_queues.empty()) { m_queues.front()->prepareEnqueue(th); @@ -52,9 +54,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() } return true; } catch (const std::exception& e) { - std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); } catch (...) { - std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); } return false; } @@ -62,30 +64,28 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() void TxnPublish::commit() throw() { -//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush; try { for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->commitEnqueue(); } } catch (const std::exception& e) { - std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); } catch (...) { - std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); } } void TxnPublish::rollback() throw() { -//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush; try { for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->abortEnqueue(); } } catch (const std::exception& e) { - std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); } catch (...) { - std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); } } @@ -98,8 +98,12 @@ TxnPublish::contentSize() void TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { -//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush; - boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg)); + boost::shared_ptr<QueuedMessage> qm; + if (m_msg->isPersistent() && queue->getStore()) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)); + } m_queues.push_back(qm); m_delivered = true; } |