summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-16 13:54:11 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-16 13:54:11 +0000
commita804510d81ade0594a75b5c9b8765cafcc233245 (patch)
tree8c6be643564b6d8c88619d17de7150c98a314781
parent1ab07197127e990da2c765ea0ffa5fd8ca47b7b6 (diff)
downloadqpid-python-a804510d81ade0594a75b5c9b8765cafcc233245.tar.gz
QPID-3858: Refactor to tidy up several class design issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1362039 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt85
-rw-r--r--cpp/src/asyncstore.cmake86
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp8
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h12
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreHandle.h (renamed from cpp/src/qpid/broker/IdHandle.h)16
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp114
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h40
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreOptions.h8
-rw-r--r--cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/EnqueueHandleImpl.h1
-rw-r--r--cpp/src/qpid/asyncStore/EventHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/EventHandleImpl.h1
-rw-r--r--cpp/src/qpid/asyncStore/MessageHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/MessageHandleImpl.h1
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp8
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h2
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.h4
-rw-r--r--cpp/src/qpid/asyncStore/QueueHandleImpl.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/RunState.h2
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp4
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandle.cpp9
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandle.h11
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.cpp11
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h91
-rw-r--r--cpp/src/qpid/broker/ConfigHandle.cpp12
-rw-r--r--cpp/src/qpid/broker/ConfigHandle.h20
-rw-r--r--cpp/src/qpid/broker/EnqueueHandle.cpp12
-rw-r--r--cpp/src/qpid/broker/EnqueueHandle.h20
-rw-r--r--cpp/src/qpid/broker/EventHandle.cpp12
-rw-r--r--cpp/src/qpid/broker/EventHandle.h22
-rw-r--r--cpp/src/qpid/broker/Handle.h83
-rw-r--r--cpp/src/qpid/broker/IdHandle.cpp32
-rw-r--r--cpp/src/qpid/broker/MessageHandle.cpp11
-rw-r--r--cpp/src/qpid/broker/MessageHandle.h20
-rw-r--r--cpp/src/qpid/broker/PrivateImplRef.h105
-rw-r--r--cpp/src/qpid/broker/QueueHandle.cpp11
-rw-r--r--cpp/src/qpid/broker/QueueHandle.h21
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp8
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h11
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp50
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h14
-rw-r--r--cpp/src/qpid/broker/TxnHandle.cpp12
-rw-r--r--cpp/src/qpid/broker/TxnHandle.h22
-rw-r--r--cpp/src/tests/CMakeLists.txt77
-rw-r--r--cpp/src/tests/asyncstore.cmake94
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp10
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h8
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Messages.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp25
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp52
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h31
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp37
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h12
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp15
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h13
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp86
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h26
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp36
65 files changed, 952 insertions, 634 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 6077bade08..1f3c2a739d 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -666,6 +666,9 @@ include (rdma.cmake)
# Check for optional SSL support requirements
include (ssl.cmake)
+# Check for optional async store build requirements
+#include (asyncstore.cmake)
+
# Check for syslog capabilities not present on all systems
check_symbol_exists (LOG_AUTHPRIV "sys/syslog.h" HAVE_LOG_AUTHPRIV)
check_symbol_exists (LOG_FTP "sys/syslog.h" HAVE_LOG_FTP)
@@ -1147,6 +1150,13 @@ set (qpidbroker_SOURCES
qpid/management/ManagementDirectExchange.cpp
qpid/management/ManagementTopicExchange.cpp
qpid/sys/TCPIOPlugin.cpp
+# New async store objects and new versions of broker objects
+# qpid/broker/AsyncResultHandle.cpp
+# qpid/broker/AsyncResultHandleImpl.cpp
+# qpid/broker/IdHandle.cpp
+# qpid/broker/TxnAsyncContext.cpp
+# qpid/broker/TxnBuffer.cpp
+# qpid/broker/TxnHandle.cpp
)
add_msvc_version (qpidbroker library dll)
add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
@@ -1459,71 +1469,12 @@ if (UNIX)
COMPONENT ${QPID_COMPONENT_COMMON})
endif (UNIX)
-
# Async Store support
-
-# Journal 2 source files
-set (jrnl2_SOURCES
- qpid/asyncStore/jrnl2/AsyncJournal.cpp
- qpid/asyncStore/jrnl2/DataOpState.cpp
- qpid/asyncStore/jrnl2/DataToken.cpp
- qpid/asyncStore/jrnl2/DataWrComplState.cpp
- qpid/asyncStore/jrnl2/DequeueHeader.cpp
- qpid/asyncStore/jrnl2/EnqueueHeader.cpp
- qpid/asyncStore/jrnl2/EventHeader.cpp
- qpid/asyncStore/jrnl2/FileHeader.cpp
- qpid/asyncStore/jrnl2/JournalDirectory.cpp
- qpid/asyncStore/jrnl2/JournalError.cpp
- qpid/asyncStore/jrnl2/JournalParameters.cpp
- qpid/asyncStore/jrnl2/JournalRunState.cpp
- qpid/asyncStore/jrnl2/RecordHeader.cpp
- qpid/asyncStore/jrnl2/RecordTail.cpp
- qpid/asyncStore/jrnl2/ScopedLock.cpp
- qpid/asyncStore/jrnl2/Streamable.cpp
- qpid/asyncStore/jrnl2/TransactionHeader.cpp
-)
-
-# AsyncStore source files
-set (asyncStore_SOURCES
- qpid/asyncStore/AsyncOperation.cpp
- qpid/asyncStore/AsyncStoreImpl.cpp
- qpid/asyncStore/AsyncStoreOptions.cpp
- qpid/asyncStore/ConfigHandleImpl.cpp
- qpid/asyncStore/EnqueueHandleImpl.cpp
- qpid/asyncStore/EventHandleImpl.cpp
- qpid/asyncStore/MessageHandleImpl.cpp
- qpid/asyncStore/OperationQueue.cpp
- qpid/asyncStore/Plugin.cpp
- qpid/asyncStore/QueueHandleImpl.cpp
- qpid/asyncStore/RunState.cpp
- qpid/asyncStore/TxnHandleImpl.cpp
- qpid/broker/AsyncResultHandle.cpp
- qpid/broker/AsyncResultHandleImpl.cpp
- qpid/broker/AsyncResultQueueImpl.cpp
- qpid/broker/ConfigHandle.cpp
- qpid/broker/EnqueueHandle.cpp
- qpid/broker/EventHandle.cpp
- qpid/broker/IdHandle.cpp
- qpid/broker/MessageHandle.cpp
- qpid/broker/QueueHandle.cpp
- qpid/broker/TxnAsyncContext.cpp
- qpid/broker/TxnBuffer.cpp
- qpid/broker/TxnHandle.cpp
-)
-
-if (UNIX)
- add_library (asyncStore MODULE
- ${jrnl2_SOURCES}
- ${asyncStore_SOURCES}
- )
- set_target_properties (asyncStore PROPERTIES
- PREFIX ""
- OUTPUT_NAME asyncStore
- SOVERSION ${asyncStore_version}
- )
- target_link_libraries (asyncStore
- aio
- rt
- uuid
- )
-endif (UNIX)
+# -------------------
+# NOTE: Currently this must be included AFTER the "add_subdirectory(tests)"
+# line (a few lines above) to work - otherwise cmake complains of linking
+# to lib of type MODULE_LIBRARY (the fact that position of include makes a
+# difference might be a possible CMake bug?). The link issue is being
+# corrected separately, and when it is, this can be moved up to the location
+# of the other module includes.
+include (asyncstore.cmake)
diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake
new file mode 100644
index 0000000000..af9a358d1e
--- /dev/null
+++ b/cpp/src/asyncstore.cmake
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# Async store library CMake fragment, to be included in CMakeLists.txt
+#
+
+# Journal 2 source files
+set (jrnl2_SOURCES
+ qpid/asyncStore/jrnl2/AsyncJournal.cpp
+ qpid/asyncStore/jrnl2/DataOpState.cpp
+ qpid/asyncStore/jrnl2/DataToken.cpp
+ qpid/asyncStore/jrnl2/DataWrComplState.cpp
+ qpid/asyncStore/jrnl2/DequeueHeader.cpp
+ qpid/asyncStore/jrnl2/EnqueueHeader.cpp
+ qpid/asyncStore/jrnl2/EventHeader.cpp
+ qpid/asyncStore/jrnl2/FileHeader.cpp
+ qpid/asyncStore/jrnl2/JournalDirectory.cpp
+ qpid/asyncStore/jrnl2/JournalError.cpp
+ qpid/asyncStore/jrnl2/JournalParameters.cpp
+ qpid/asyncStore/jrnl2/JournalRunState.cpp
+ qpid/asyncStore/jrnl2/RecordHeader.cpp
+ qpid/asyncStore/jrnl2/RecordTail.cpp
+ qpid/asyncStore/jrnl2/ScopedLock.cpp
+ qpid/asyncStore/jrnl2/Streamable.cpp
+ qpid/asyncStore/jrnl2/TransactionHeader.cpp
+)
+
+# AsyncStore source files
+set (asyncStore_SOURCES
+ qpid/asyncStore/AsyncOperation.cpp
+ qpid/asyncStore/AsyncStoreImpl.cpp
+ qpid/asyncStore/AsyncStoreOptions.cpp
+ qpid/asyncStore/ConfigHandleImpl.cpp
+ qpid/asyncStore/EnqueueHandleImpl.cpp
+ qpid/asyncStore/EventHandleImpl.cpp
+ qpid/asyncStore/MessageHandleImpl.cpp
+ qpid/asyncStore/OperationQueue.cpp
+ qpid/asyncStore/Plugin.cpp
+ qpid/asyncStore/QueueHandleImpl.cpp
+ qpid/asyncStore/RunState.cpp
+ qpid/asyncStore/TxnHandleImpl.cpp
+ qpid/broker/AsyncResultHandle.cpp
+ qpid/broker/AsyncResultHandleImpl.cpp
+ qpid/broker/AsyncResultQueueImpl.cpp
+ qpid/broker/ConfigHandle.cpp
+ qpid/broker/EnqueueHandle.cpp
+ qpid/broker/EventHandle.cpp
+ qpid/broker/MessageHandle.cpp
+ qpid/broker/QueueHandle.cpp
+ qpid/broker/TxnAsyncContext.cpp
+ qpid/broker/TxnBuffer.cpp
+ qpid/broker/TxnHandle.cpp
+)
+
+if (UNIX)
+ add_library (asyncStore MODULE
+ ${jrnl2_SOURCES}
+ ${asyncStore_SOURCES}
+ )
+ set_target_properties (asyncStore PROPERTIES
+ PREFIX ""
+ OUTPUT_NAME asyncStore
+ SOVERSION ${asyncStore_version}
+ )
+ target_link_libraries (asyncStore
+ aio
+ rt
+ uuid
+ )
+endif (UNIX)
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index bcaac5c548..b8fdb8b140 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -38,7 +38,7 @@ AsyncOperation::AsyncOperation() :
{}
AsyncOperation::AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
m_targetHandle(th),
@@ -48,7 +48,7 @@ AsyncOperation::AsyncOperation(const opCode op,
{}
AsyncOperation::AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
@@ -59,7 +59,7 @@ AsyncOperation::AsyncOperation(const opCode op,
{}
AsyncOperation::AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
const qpid::broker::TxnHandle* txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
@@ -70,7 +70,7 @@ AsyncOperation::AsyncOperation(const opCode op,
{}
AsyncOperation::AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
const qpid::broker::DataSource* const dataSrc,
const qpid::broker::TxnHandle* txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index 188750e910..cb73ad639b 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -25,10 +25,10 @@
#define qpid_asyncStore_AsyncOperation_h_
#include "qpid/broker/AsyncStore.h"
-#include "qpid/broker/IdHandle.h"
namespace qpid {
namespace asyncStore {
+class AsyncStoreHandle;
class AsyncOperation {
public:
@@ -49,18 +49,18 @@ public:
AsyncOperation();
AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
const qpid::broker::TxnHandle* txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
AsyncOperation(const opCode op,
- const qpid::broker::IdHandle* th,
+ const AsyncStoreHandle* th,
const qpid::broker::DataSource* const dataSrc,
const qpid::broker::TxnHandle* txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
@@ -71,7 +71,7 @@ public:
private:
opCode m_op;
- const qpid::broker::IdHandle* m_targetHandle;
+ const AsyncStoreHandle* m_targetHandle;
const qpid::broker::DataSource* const m_dataSrc;
const qpid::broker::TxnHandle* m_txnHandle;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt;
diff --git a/cpp/src/qpid/broker/IdHandle.h b/cpp/src/qpid/asyncStore/AsyncStoreHandle.h
index ddf94a2396..97111d1b84 100644
--- a/cpp/src/qpid/broker/IdHandle.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreHandle.h
@@ -18,20 +18,20 @@
*/
/**
- * \file IdHandle.h
+ * \file AsyncStoreHandle.h
*/
-#ifndef qpid_broker_IdHandle_h_
-#define qpid_broker_IdHandle_h_
+#ifndef qpid_asyncStore_AsyncStoreHandle_h_
+#define qpid_asyncStore_AsyncStoreHandle_h_
namespace qpid {
-namespace broker {
+namespace asyncStore {
-class IdHandle {
+class AsyncStoreHandle {
public:
- virtual ~IdHandle();
+ virtual ~AsyncStoreHandle() {}
};
-}} // namespace qpid::broker
+}} // namespace qpid::asyncStore
-#endif // qpid_broker_IdHandle_h_
+#endif // qpid_asyncStore_AsyncStoreHandle_h_
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 9135fcc27e..6b5c3ac582 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -23,7 +23,11 @@
#include "AsyncStoreImpl.h"
-#include "AsyncOperation.h"
+#include "ConfigHandleImpl.h"
+#include "EnqueueHandleImpl.h"
+#include "EventHandleImpl.h"
+#include "MessageHandleImpl.h"
+#include "QueueHandleImpl.h"
#include "TxnHandleImpl.h"
#include "qpid/broker/ConfigHandle.h"
@@ -33,8 +37,6 @@
#include "qpid/broker/QueueHandle.h"
#include "qpid/broker/TxnHandle.h"
-#include <boost/intrusive_ptr.hpp>
-
namespace qpid {
namespace asyncStore {
@@ -88,6 +90,36 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid,
return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb));
}
+void
+AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+{
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
+ dynamic_cast<AsyncStoreHandle*>(&txnHandle),
+ brokerCtxt));
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+{
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
+ dynamic_cast<AsyncStoreHandle*>(&txnHandle),
+ brokerCtxt));
+ m_operations.submit(op);
+}
+
+void
+AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+{
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
+ dynamic_cast<AsyncStoreHandle*>(&txnHandle),
+ brokerCtxt));
+ m_operations.submit(op);
+}
+
qpid::broker::ConfigHandle
AsyncStoreImpl::createConfigHandle()
{
@@ -98,14 +130,16 @@ qpid::broker::EnqueueHandle
AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
qpid::broker::QueueHandle& queueHandle)
{
- return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle));
+ return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle,
+ queueHandle));
}
qpid::broker::EventHandle
AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle,
const std::string& key)
{
- return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, key));
+ return qpid::broker::EventHandle(new EventHandleImpl(queueHandle,
+ key));
}
qpid::broker::MessageHandle
@@ -123,42 +157,12 @@ AsyncStoreImpl::createQueueHandle(const std::string& name,
}
void
-AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
-{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- brokerCtxt));
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
-{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- brokerCtxt));
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
-{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- brokerCtxt));
- m_operations.submit(op);
-}
-
-void
AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
dataSrc,
brokerCtxt));
m_operations.submit(op);
@@ -169,7 +173,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
brokerCtxt));
m_operations.submit(op);
}
@@ -180,7 +184,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ dynamic_cast<AsyncStoreHandle*>(&queueHandle),
dataSrc,
brokerCtxt));
m_operations.submit(op);
@@ -191,7 +195,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ dynamic_cast<AsyncStoreHandle*>(&queueHandle),
brokerCtxt));
m_operations.submit(op);
}
@@ -201,19 +205,7 @@ AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- brokerCtxt));
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
-{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- dataSrc,
+ dynamic_cast<AsyncStoreHandle*>(&queueHandle),
brokerCtxt));
m_operations.submit(op);
}
@@ -225,7 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dynamic_cast<AsyncStoreHandle*>(&eventHandle),
dataSrc,
&txnHandle,
brokerCtxt));
@@ -234,21 +226,11 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
-{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- brokerCtxt));
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dynamic_cast<AsyncStoreHandle*>(&eventHandle),
&txnHandle,
brokerCtxt));
m_operations.submit(op);
@@ -260,7 +242,7 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ dynamic_cast<AsyncStoreHandle*>(&enqHandle),
&txnHandle,
brokerCtxt));
m_operations.submit(op);
@@ -272,7 +254,7 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ dynamic_cast<AsyncStoreHandle*>(&enqHandle),
&txnHandle,
brokerCtxt));
m_operations.submit(op);
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index a00771abf5..7dee03dc6d 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -30,17 +30,19 @@
#include "qpid/asyncStore/jrnl2/RecordIdCounter.h"
#include "qpid/broker/AsyncStore.h"
-#include "qpid/sys/Poller.h"
namespace qpid {
-
namespace broker {
class Broker;
-} // namespace qpid::broker
+}
+
+namespace sys {
+class Poller;
+}
namespace asyncStore {
-class AsyncStoreImpl: public qpid::broker::AsyncStore {
+class AsyncStoreImpl : public qpid::broker::AsyncStore {
public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
const AsyncStoreOptions& opts);
@@ -52,12 +54,23 @@ public:
void initManagement(qpid::broker::Broker* broker);
- // --- Factory methods for creating handles ---
+ // --- Interface from AsyncTransactionalStore ---
qpid::broker::TxnHandle createTxnHandle();
qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
qpid::broker::TxnHandle createTxnHandle(const std::string& xid);
- qpid::broker::TxnHandle createTxnHandle(const std::string& xid, qpid::broker::TxnBuffer* tb);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
+ qpid::broker::TxnBuffer* tb);
+
+ void submitPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ void submitCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ void submitAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+
+
+ // --- Interface from AsyncStore ---
qpid::broker::ConfigHandle createConfigHandle();
qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
@@ -68,16 +81,6 @@ public:
qpid::broker::QueueHandle createQueueHandle(const std::string& name,
const qpid::types::Variant::Map& opts);
-
- // --- Store async interface ---
-
- void submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- void submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- void submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
-
void submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
@@ -94,14 +97,9 @@ public:
void submitCreate(qpid::broker::EventHandle& eventHandle,
const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- void submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitDestroy(qpid::broker::EventHandle& eventHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- void submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h
index 2849e3d2c6..a1325839d9 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreOptions.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.h
@@ -24,16 +24,11 @@
#ifndef qpid_asyncStore_AsyncStoreOptions_h_
#define qpid_asyncStore_AsyncStoreOptions_h_
-#include "qpid/asyncStore/jrnl2/Streamable.h"
-
#include "qpid/Options.h"
#include <string>
namespace qpid {
-namespace broker {
-class Options;
-}
namespace asyncStore {
class AsyncStoreOptions : public qpid::Options
@@ -49,7 +44,8 @@ public:
std::string m_storeDir;
private:
- // Static initialization race condition avoidance with static instance of Plugin class (using construct-on-first-use idiom).
+ // Static initialization race condition avoidance with static instance of Plugin class
+ // (using construct-on-first-use idiom).
static std::string& getDefaultStoreDir();
};
diff --git a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp
index 64e2e848fa..fc47f330a4 100644
--- a/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/ConfigHandleImpl.cpp
@@ -23,8 +23,6 @@
#include "ConfigHandleImpl.h"
-#include "qpid/messaging/PrivateImplRef.h"
-
namespace qpid {
namespace asyncStore {
diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp
index 0e291def78..be87831520 100644
--- a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.cpp
@@ -23,8 +23,6 @@
#include "EnqueueHandleImpl.h"
-#include "qpid/messaging/PrivateImplRef.h"
-
namespace qpid {
namespace asyncStore {
diff --git a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h
index f976b6d246..2788366c5c 100644
--- a/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/EnqueueHandleImpl.h
@@ -27,7 +27,6 @@
#include "qpid/RefCounted.h"
namespace qpid {
-
namespace broker {
class MessageHandle;
class QueueHandle;
diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp
index 05b98d1e5d..4fd8805912 100644
--- a/cpp/src/qpid/asyncStore/EventHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/EventHandleImpl.cpp
@@ -23,8 +23,6 @@
#include "EventHandleImpl.h"
-#include "qpid/messaging/PrivateImplRef.h"
-
namespace qpid {
namespace asyncStore {
diff --git a/cpp/src/qpid/asyncStore/EventHandleImpl.h b/cpp/src/qpid/asyncStore/EventHandleImpl.h
index 1c6bc52f9f..9a0e694d12 100644
--- a/cpp/src/qpid/asyncStore/EventHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/EventHandleImpl.h
@@ -27,7 +27,6 @@
#include "qpid/RefCounted.h"
namespace qpid {
-
namespace broker {
class QueueHandle;
}
diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp
index cea039221a..a926aa161f 100644
--- a/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.cpp
@@ -23,8 +23,6 @@
#include "MessageHandleImpl.h"
-#include "qpid/messaging/PrivateImplRef.h"
-
namespace qpid {
namespace asyncStore {
diff --git a/cpp/src/qpid/asyncStore/MessageHandleImpl.h b/cpp/src/qpid/asyncStore/MessageHandleImpl.h
index eb80fca2d4..ded9d63375 100644
--- a/cpp/src/qpid/asyncStore/MessageHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/MessageHandleImpl.h
@@ -27,7 +27,6 @@
#include "qpid/RefCounted.h"
namespace qpid {
-
namespace broker {
class DataSource;
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 0b7f58bd6c..dd4e7c1343 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -24,6 +24,8 @@
#include "OperationQueue.h"
#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/AsyncResultHandleImpl.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace asyncStore {
@@ -42,7 +44,6 @@ OperationQueue::~OperationQueue()
void
OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op)
{
-//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -52,7 +53,6 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
if (bc) {
qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
@@ -64,9 +64,9 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
}
}
} catch (const std::exception& e) {
- std::cerr << "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what() << std::endl;
+ QPID_LOG(error, "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what());
} catch (...) {
- std::cerr << "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op" << std::endl;
+ QPID_LOG(error, "qpid::asyncStore::OperationQueue: Unknown exception thrown processing async op");
}
return e.end();
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
index d2bd5d0f26..143ac2ab0c 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.h
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -26,7 +26,7 @@
#include "AsyncOperation.h"
-#include "qpid/broker/AsyncStore.h"
+//#include "qpid/broker/AsyncStore.h"
#include "qpid/sys/PollableQueue.h"
namespace qpid {
diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp
index 4f35e8cd2a..400237df75 100644
--- a/cpp/src/qpid/asyncStore/Plugin.cpp
+++ b/cpp/src/qpid/asyncStore/Plugin.cpp
@@ -23,6 +23,8 @@
#include "Plugin.h"
+#include "AsyncStoreImpl.h"
+
#include "qpid/broker/Broker.h"
namespace qpid {
diff --git a/cpp/src/qpid/asyncStore/Plugin.h b/cpp/src/qpid/asyncStore/Plugin.h
index 7cf500a122..7d2b67bdad 100644
--- a/cpp/src/qpid/asyncStore/Plugin.h
+++ b/cpp/src/qpid/asyncStore/Plugin.h
@@ -24,13 +24,15 @@
#ifndef qpid_broker_Plugin_h_
#define qpid_broker_Plugin_h_
-#include "AsyncStoreImpl.h"
#include "AsyncStoreOptions.h"
#include "qpid/Plugin.h"
namespace qpid {
class Options;
+namespace asyncStore {
+class AsyncStoreImpl;
+}
namespace broker {
class Plugin : public qpid::Plugin
diff --git a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp
index 523a31ba7b..3781329b92 100644
--- a/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/QueueHandleImpl.cpp
@@ -23,8 +23,6 @@
#include "QueueHandleImpl.h"
-#include "qpid/messaging/PrivateImplRef.h"
-
namespace qpid {
namespace asyncStore {
diff --git a/cpp/src/qpid/asyncStore/RunState.h b/cpp/src/qpid/asyncStore/RunState.h
index dfa331ea1a..832e922a04 100644
--- a/cpp/src/qpid/asyncStore/RunState.h
+++ b/cpp/src/qpid/asyncStore/RunState.h
@@ -59,7 +59,7 @@ typedef enum {
RS_STOPPED
} RunState_t;
-class RunState: public qpid::asyncStore::jrnl2::State<RunState_t>
+class RunState : public qpid::asyncStore::jrnl2::State<RunState_t>
{
public:
RunState();
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index c5371f161c..2b343e9517 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -25,7 +25,7 @@
#include "qpid/Exception.h"
#include "qpid/broker/TxnBuffer.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/log/Statement.h"
#include <uuid/uuid.h>
@@ -117,7 +117,7 @@ TxnHandleImpl::createLocalXid()
char uuidStr[37]; // 36-char uuid + trailing '\0'
::uuid_unparse(uuid, uuidStr);
m_xid.assign(uuidStr);
-//std::cout << "TTT TxnHandleImpl::createLocalXid(): Local XID created: \"" << m_xid << "\"" << std::endl << std::flush;
+ QPID_LOG(debug, "Local XID created: \"" << m_xid << "\"");
}
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp
index cdd2231977..d2fa9ae3e0 100644
--- a/cpp/src/qpid/broker/AsyncResultHandle.cpp
+++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp
@@ -23,21 +23,22 @@
#include "AsyncResultHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "AsyncResultHandleImpl.h"
+#include "PrivateImplRef.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<AsyncResultHandle> PrivateImpl;
+typedef PrivateImplRef<AsyncResultHandle> PrivateImpl;
AsyncResultHandle::AsyncResultHandle(AsyncResultHandleImpl* p) :
- qpid::messaging::Handle<AsyncResultHandleImpl>()
+ Handle<AsyncResultHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
AsyncResultHandle::AsyncResultHandle(const AsyncResultHandle& r) :
- qpid::messaging::Handle<AsyncResultHandleImpl>()
+ Handle<AsyncResultHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h
index f916bde5d3..cf0fea5a06 100644
--- a/cpp/src/qpid/broker/AsyncResultHandle.h
+++ b/cpp/src/qpid/broker/AsyncResultHandle.h
@@ -24,14 +24,17 @@
#ifndef qpid_broker_AsyncResultHandle_h_
#define qpid_broker_AsyncResultHandle_h_
-#include "AsyncResultHandleImpl.h"
+#include "Handle.h"
-#include "qpid/messaging/Handle.h"
+#include <boost/shared_ptr.hpp>
+#include <string>
namespace qpid {
namespace broker {
+class AsyncResultHandleImpl;
+class BrokerAsyncContext;
-class AsyncResultHandle : public qpid::messaging::Handle<AsyncResultHandleImpl>
+class AsyncResultHandle : public Handle<AsyncResultHandleImpl>
{
public:
AsyncResultHandle(AsyncResultHandleImpl* p = 0);
@@ -47,7 +50,7 @@ public:
void invokeAsyncResultCallback() const;
private:
- friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>;
+ friend class PrivateImplRef<AsyncResultHandle>;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
index 62c7ed33d9..ab408be9ca 100644
--- a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
@@ -21,9 +21,12 @@
* \file AsyncResultQueueImpl.cpp
*/
-#include "AsyncResultHandle.h"
#include "AsyncResultQueueImpl.h"
+#include "AsyncResultHandle.h"
+
+#include "qpid/log/Statement.h"
+
namespace qpid {
namespace broker {
@@ -41,7 +44,6 @@ AsyncResultQueueImpl::~AsyncResultQueueImpl()
void
AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh)
{
-//std::cout << "==> AsyncResultQueueImpl::submit() errNo=" << arh->getErrNo() << " errMsg=\"" << arh->getErrMsg() << "\"" << std::endl << std::flush;
m_resQueue.push(arh);
}
@@ -51,15 +53,14 @@ AsyncResultQueueImpl::handle(const ResultQueue::Batch& e)
{
try {
for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush;
if ((*i)->isValid()) {
(*i)->invokeAsyncResultCallback();
}
}
} catch (const std::exception& e) {
- std::cerr << "qpid::broker::AsyncResultQueueImpl: Exception thrown processing async result: " << e.what() << std::endl;
+ QPID_LOG(error, "Exception thrown processing async result: " << e.what());
} catch (...) {
- std::cerr << "qpid::broker::AsyncResultQueueImpl: Unknown exception thrown processing async result" << std::endl;
+ QPID_LOG(error, "Unknown exception thrown processing async result");
}
return e.end();
}
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 2f73ec8f3e..7bb6175862 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -20,12 +20,10 @@
#ifndef qpid_broker_AsyncStore_h_
#define qpid_broker_AsyncStore_h_
-// TODO: See if we can replace this with a forward declaration, but current definition of qpid::types::Variant::Map
-// does not allow it. Using a local map<std::string, Variant> definition also precludes forward declaration.
#include "qpid/types/Variant.h" // qpid::types::Variant::Map
#include <boost/shared_ptr.hpp>
-#include <stdint.h>
+#include <stdint.h> // uint64_t
#include <string>
namespace qpid {
@@ -57,66 +55,91 @@ public:
virtual void write(char* target) = 0;
};
-// Callback invoked by AsyncResultQueue to pass back async results
-typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+// Opaque async handles used for carrying persistence state.
class ConfigHandle;
class EnqueueHandle;
class EventHandle;
class MessageHandle;
class QueueHandle;
-class TxnBuffer;
class TxnHandle;
-class AsyncTransactionalStore {
+class TxnBuffer;
+
+class AsyncTransaction {
public:
- virtual ~AsyncTransactionalStore() {}
+ virtual ~AsyncTransaction() {}
virtual TxnHandle createTxnHandle() = 0;
virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
virtual TxnHandle createTxnHandle(const std::string& xid) = 0;
- virtual TxnHandle createTxnHandle(const std::string& xid, TxnBuffer* tb) = 0;
-
- // TODO: Remove boost::shared_ptr<> from this interface
- virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
- virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid,
+ TxnBuffer* tb) = 0;
+
+ // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface
+ virtual void submitPrepare(TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
+ virtual void submitCommit(TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitAbort(TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
};
// Subclassed by store:
-class AsyncStore : public AsyncTransactionalStore {
+class AsyncStore : public AsyncTransaction {
public:
virtual ~AsyncStore() {}
// --- Factory methods for creating handles ---
virtual ConfigHandle createConfigHandle() = 0;
- virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0;
- virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0;
+ virtual EnqueueHandle createEnqueueHandle(MessageHandle&,
+ QueueHandle&) = 0;
+ virtual EventHandle createEventHandle(QueueHandle&,
+ const std::string& key=std::string()) = 0;
virtual MessageHandle createMessageHandle(const DataSource* const) = 0;
- virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0;
+ virtual QueueHandle createQueueHandle(const std::string& name,
+ const qpid::types::Variant::Map& opts) = 0;
// --- Store async interface ---
- // TODO: Remove boost::shared_ptr<> from this interface
- virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
-
- virtual void submitCreate(QueueHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitDestroy(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitFlush(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
-
- virtual void submitCreate(EventHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitCreate(EventHandle&, const DataSource* const, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitDestroy(EventHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitDestroy(EventHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
-
- virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitDequeue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface
+ virtual void submitCreate(ConfigHandle&,
+ const DataSource* const,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(ConfigHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+
+ virtual void submitCreate(QueueHandle&,
+ const DataSource* const,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(QueueHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitFlush(QueueHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+
+ virtual void submitCreate(EventHandle&,
+ const DataSource* const,
+ TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(EventHandle&,
+ TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+
+ virtual void submitEnqueue(EnqueueHandle&,
+ TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDequeue(EnqueueHandle&,
+ TxnHandle&,
+ boost::shared_ptr<BrokerAsyncContext>) = 0;
// Legacy - Restore FTD message, is NOT async!
- virtual int loadContent(MessageHandle&, QueueHandle&, char* data, uint64_t offset, const uint64_t length) = 0;
+ virtual int loadContent(MessageHandle&,
+ QueueHandle&,
+ char* data,
+ uint64_t offset,
+ const uint64_t length) = 0;
};
diff --git a/cpp/src/qpid/broker/ConfigHandle.cpp b/cpp/src/qpid/broker/ConfigHandle.cpp
index 13f7e7fa94..0bd65543ae 100644
--- a/cpp/src/qpid/broker/ConfigHandle.cpp
+++ b/cpp/src/qpid/broker/ConfigHandle.cpp
@@ -23,23 +23,23 @@
#include "ConfigHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "PrivateImplRef.h"
+
+#include "qpid/asyncStore/ConfigHandleImpl.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<ConfigHandle> PrivateImpl;
+typedef PrivateImplRef<ConfigHandle> PrivateImpl;
ConfigHandle::ConfigHandle(qpid::asyncStore::ConfigHandleImpl* p) :
- qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::ConfigHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
ConfigHandle::ConfigHandle(const ConfigHandle& r) :
- qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::ConfigHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h
index 52a9d672d5..6bcb3d8ce0 100644
--- a/cpp/src/qpid/broker/ConfigHandle.h
+++ b/cpp/src/qpid/broker/ConfigHandle.h
@@ -21,19 +21,21 @@
* \file ConfigHandle.h
*/
-#ifndef qpid_broker_ConfigHandleImpl_h_
-#define qpid_broker_ConfigHandleImpl_h_
+#ifndef qpid_broker_ConfigHandle_h_
+#define qpid_broker_ConfigHandle_h_
-#include "IdHandle.h"
+#include "Handle.h"
-#include "qpid/asyncStore/ConfigHandleImpl.h"
-#include "qpid/messaging/Handle.h"
+#include "qpid/asyncStore/AsyncStoreHandle.h"
namespace qpid {
+namespace asyncStore {
+class ConfigHandleImpl;
+}
namespace broker {
-class ConfigHandle : public qpid::messaging::Handle<qpid::asyncStore::ConfigHandleImpl>,
- public IdHandle
+class ConfigHandle : public Handle<qpid::asyncStore::ConfigHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
{
public:
ConfigHandle(qpid::asyncStore::ConfigHandleImpl* p = 0);
@@ -45,9 +47,9 @@ public:
// <none>
private:
- friend class qpid::messaging::PrivateImplRef<ConfigHandle>;
+ friend class PrivateImplRef<ConfigHandle>;
};
}} // namespace qpid::broker
-#endif // qpid_broker_ConfigHandleImpl_h_
+#endif // qpid_broker_ConfigHandle_h_
diff --git a/cpp/src/qpid/broker/EnqueueHandle.cpp b/cpp/src/qpid/broker/EnqueueHandle.cpp
index 3b8e2d5b30..877eb680a6 100644
--- a/cpp/src/qpid/broker/EnqueueHandle.cpp
+++ b/cpp/src/qpid/broker/EnqueueHandle.cpp
@@ -23,23 +23,23 @@
#include "EnqueueHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "PrivateImplRef.h"
+
+#include "qpid/asyncStore/EnqueueHandleImpl.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<EnqueueHandle> PrivateImpl;
+typedef PrivateImplRef<EnqueueHandle> PrivateImpl;
EnqueueHandle::EnqueueHandle(qpid::asyncStore::EnqueueHandleImpl* p) :
- qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::EnqueueHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
EnqueueHandle::EnqueueHandle(const EnqueueHandle& r) :
- qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::EnqueueHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h
index cdd07e246b..f869a755b1 100644
--- a/cpp/src/qpid/broker/EnqueueHandle.h
+++ b/cpp/src/qpid/broker/EnqueueHandle.h
@@ -21,19 +21,21 @@
* \file EnqueueHandle.h
*/
-#ifndef qpid_broker_EnqueueHandleImpl_h_
-#define qpid_broker_EnqueueHandleImpl_h_
+#ifndef qpid_broker_EnqueueHandle_h_
+#define qpid_broker_EnqueueHandle_h_
-#include "IdHandle.h"
+#include "Handle.h"
-#include "qpid/asyncStore/EnqueueHandleImpl.h"
-#include "qpid/messaging/Handle.h"
+#include "qpid/asyncStore/AsyncStoreHandle.h"
namespace qpid {
+namespace asyncStore {
+class EnqueueHandleImpl;
+}
namespace broker {
-class EnqueueHandle : public qpid::messaging::Handle<qpid::asyncStore::EnqueueHandleImpl>,
- public IdHandle
+class EnqueueHandle : public Handle<qpid::asyncStore::EnqueueHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
{
public:
EnqueueHandle(qpid::asyncStore::EnqueueHandleImpl* p = 0);
@@ -45,9 +47,9 @@ public:
// <none>
private:
- friend class qpid::messaging::PrivateImplRef<EnqueueHandle>;
+ friend class PrivateImplRef<EnqueueHandle>;
};
}} // namespace qpid::broker
-#endif // qpid_broker_EnqueueHandleImpl_h_
+#endif // qpid_broker_EnqueueHandle_h_
diff --git a/cpp/src/qpid/broker/EventHandle.cpp b/cpp/src/qpid/broker/EventHandle.cpp
index 97d5920837..81f33b59a6 100644
--- a/cpp/src/qpid/broker/EventHandle.cpp
+++ b/cpp/src/qpid/broker/EventHandle.cpp
@@ -23,23 +23,23 @@
#include "EventHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "PrivateImplRef.h"
+
+#include "qpid/asyncStore/EventHandleImpl.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<EventHandle> PrivateImpl;
+typedef PrivateImplRef<EventHandle> PrivateImpl;
EventHandle::EventHandle(qpid::asyncStore::EventHandleImpl* p) :
- qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::EventHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
EventHandle::EventHandle(const EventHandle& r) :
- qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::EventHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h
index 20e7773502..31f0e22dbf 100644
--- a/cpp/src/qpid/broker/EventHandle.h
+++ b/cpp/src/qpid/broker/EventHandle.h
@@ -21,19 +21,23 @@
* \file EventHandle.h
*/
-#ifndef qpid_broker_EventHandleImpl_h_
-#define qpid_broker_EventHandleImpl_h_
+#ifndef qpid_broker_EventHandle_h_
+#define qpid_broker_EventHandle_h_
-#include "IdHandle.h"
+#include "Handle.h"
-#include "qpid/asyncStore/EventHandleImpl.h"
-#include "qpid/messaging/Handle.h"
+#include "qpid/asyncStore/AsyncStoreHandle.h"
+
+#include <string>
namespace qpid {
+namespace asyncStore {
+class EventHandleImpl;
+}
namespace broker {
-class EventHandle : public qpid::messaging::Handle<qpid::asyncStore::EventHandleImpl>,
- public IdHandle
+class EventHandle : public Handle<qpid::asyncStore::EventHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
{
public:
EventHandle(qpid::asyncStore::EventHandleImpl* p = 0);
@@ -45,9 +49,9 @@ public:
const std::string& getKey() const;
private:
- friend class qpid::messaging::PrivateImplRef<EventHandle>;
+ friend class PrivateImplRef<EventHandle>;
};
}} // namespace qpid::broker
-#endif // qpid_broker_EventHandleImpl_h_
+#endif // qpid_broker_EventHandle_h_
diff --git a/cpp/src/qpid/broker/Handle.h b/cpp/src/qpid/broker/Handle.h
new file mode 100644
index 0000000000..397f58f2e7
--- /dev/null
+++ b/cpp/src/qpid/broker/Handle.h
@@ -0,0 +1,83 @@
+#ifndef QPID_BROKER_HANDLE_H
+#define QPID_BROKER_HANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * NOTE: This is a copy of qpid::messaging::Handle (but stripped of its
+ * messaging-specific Windows decoration macros)
+ *
+ * This (together with PrivateImplRef.h) has been placed here so
+ * as not to introduce unnecessary dependencies on qpid::messaging
+ * for users of the Handle template in the qpid::broker namespace.
+ *
+ * Any fixes made here should also be made to qpid/messaging/Handle.h
+ *
+ * TODO: Find the correct Windows decorations for these functions.
+ * TODO: Find (if possible) a way to eliminate two copies of the same code.
+ */
+
+namespace qpid {
+namespace broker {
+
+template <class> class PrivateImplRef;
+
+/** \ingroup messaging
+ * A handle is like a pointer: refers to an underlying implementation object.
+ * Copying the handle does not copy the object.
+ *
+ * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the
+ * conversion to bool to test for a null handle.
+ */
+template <class T> class Handle {
+ public:
+
+ /**@return true if handle is valid, i.e. not null. */
+ bool isValid() const { return impl; }
+
+ /**@return true if handle is null. It is an error to call any function on a null handle. */
+ bool isNull() const { return !impl; }
+
+ /** Conversion to bool supports idiom if (handle) { handle->... } */
+ operator bool() const { return impl; }
+
+ /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */
+ bool operator !() const { return !impl; }
+
+ void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; }
+
+ protected:
+ typedef T Impl;
+ Handle() :impl() {}
+
+ // Not implemented,subclasses must implement.
+ Handle(const Handle&);
+ Handle& operator=(const Handle&);
+
+ Impl* impl;
+
+ friend class PrivateImplRef<T>;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_HANDLE_H*/
diff --git a/cpp/src/qpid/broker/IdHandle.cpp b/cpp/src/qpid/broker/IdHandle.cpp
deleted file mode 100644
index ebb8f9a3c6..0000000000
--- a/cpp/src/qpid/broker/IdHandle.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file IdHandle.cpp
- */
-
-#include "IdHandle.h"
-
-namespace qpid {
-namespace broker {
-
-IdHandle::~IdHandle()
-{}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandle.cpp b/cpp/src/qpid/broker/MessageHandle.cpp
index 9a7b631e8b..2727e74edc 100644
--- a/cpp/src/qpid/broker/MessageHandle.cpp
+++ b/cpp/src/qpid/broker/MessageHandle.cpp
@@ -23,22 +23,23 @@
#include "MessageHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "PrivateImplRef.h"
+
+#include "qpid/asyncStore/MessageHandleImpl.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<MessageHandle> PrivateImpl;
+typedef PrivateImplRef<MessageHandle> PrivateImpl;
MessageHandle::MessageHandle(qpid::asyncStore::MessageHandleImpl* p) :
- IdHandle()
+ Handle<qpid::asyncStore::MessageHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
MessageHandle::MessageHandle(const MessageHandle& r) :
- qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::MessageHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h
index 739c53f7d3..e0a68a8878 100644
--- a/cpp/src/qpid/broker/MessageHandle.h
+++ b/cpp/src/qpid/broker/MessageHandle.h
@@ -21,19 +21,21 @@
* \file MessageHandle.h
*/
-#ifndef qpid_broker_MessageHandleImpl_h_
-#define qpid_broker_MessageHandleImpl_h_
+#ifndef qpid_broker_MessageHandle_h_
+#define qpid_broker_MessageHandle_h_
-#include "IdHandle.h"
+#include "Handle.h"
-#include "qpid/asyncStore/MessageHandleImpl.h"
-#include "qpid/messaging/Handle.h"
+#include "qpid/asyncStore/AsyncStoreHandle.h"
namespace qpid {
+namespace asyncStore {
+class MessageHandleImpl;
+}
namespace broker {
-class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>,
- public IdHandle
+class MessageHandle : public Handle<qpid::asyncStore::MessageHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
{
public:
MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0);
@@ -45,9 +47,9 @@ public:
// <none>
private:
- friend class qpid::messaging::PrivateImplRef<MessageHandle>;
+ friend class PrivateImplRef<MessageHandle>;
};
}} // namespace qpid::broker
-#endif // qpid_broker_MessageHandleImpl_h_
+#endif // qpid_broker_MessageHandle_h_
diff --git a/cpp/src/qpid/broker/PrivateImplRef.h b/cpp/src/qpid/broker/PrivateImplRef.h
new file mode 100644
index 0000000000..5932ab882b
--- /dev/null
+++ b/cpp/src/qpid/broker/PrivateImplRef.h
@@ -0,0 +1,105 @@
+#ifndef QPID_BROKER_PRIVATEIMPLREF_H
+#define QPID_BROKER_PRIVATEIMPLREF_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * NOTE: This is a copy of qpid::messaging::PrivateImplRef
+ *
+ * This (together with Handle.h) has been placed here so
+ * as not to introduce unnecessary dependencies on qpid::messaging
+ * for users of the Handle template in the qpid::broker namespace.
+ *
+ * Any fixes made here should also be made to qpid/messaging/PrivateImplRef.h
+ *
+ * TODO: Find (if possible) a way to eliminate two copies of the same code.
+ */
+
+#include <boost/intrusive_ptr.hpp>
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Helper class to implement a class with a private, reference counted
+ * implementation and reference semantics.
+ *
+ * Such classes are used in the public API to hide implementation, they
+ * should. Example of use:
+ *
+ * === Foo.h
+ *
+ * template <class T> class PrivateImplRef;
+ * class FooImpl;
+ *
+ * Foo : public Handle<FooImpl> {
+ * public:
+ * Foo(FooImpl* = 0);
+ * Foo(const Foo&);
+ * ~Foo();
+ * Foo& operator=(const Foo&);
+ *
+ * int fooDo(); // and other Foo functions...
+ *
+ * private:
+ * typedef FooImpl Impl;
+ * Impl* impl;
+ * friend class PrivateImplRef<Foo>;
+ *
+ * === Foo.cpp
+ *
+ * typedef PrivateImplRef<Foo> PI;
+ * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); }
+ * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); }
+ * Foo::~Foo() { PI::dtor(*this); }
+ * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); }
+ *
+ * int foo::fooDo() { return impl->fooDo(); }
+ *
+ */
+template <class T> class PrivateImplRef {
+ public:
+ typedef typename T::Impl Impl;
+ typedef boost::intrusive_ptr<Impl> intrusive_ptr;
+
+ /** Get the implementation pointer from a handle */
+ static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
+
+ /** Set the implementation pointer in a handle */
+ static void set(T& t, const intrusive_ptr& p) {
+ if (t.impl == p) return;
+ if (t.impl) boost::intrusive_ptr_release(t.impl);
+ t.impl = p.get();
+ if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+ }
+
+ // Helper functions to implement the ctor, dtor, copy, assign
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+ static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
+ static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+ static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PRIVATEIMPLREF_H*/
diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp
index 780eb6395c..5a5678df5a 100644
--- a/cpp/src/qpid/broker/QueueHandle.cpp
+++ b/cpp/src/qpid/broker/QueueHandle.cpp
@@ -23,22 +23,23 @@
#include "QueueHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "PrivateImplRef.h"
+
+#include "qpid/asyncStore/QueueHandleImpl.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<QueueHandle> PrivateImpl;
+typedef PrivateImplRef<QueueHandle> PrivateImpl;
QueueHandle::QueueHandle(qpid::asyncStore::QueueHandleImpl* p) :
- IdHandle()
+ Handle<qpid::asyncStore::QueueHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
QueueHandle::QueueHandle(const QueueHandle& r) :
- qpid::messaging::Handle<qpid::asyncStore::QueueHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::QueueHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h
index cb366e2880..234c5e15e8 100644
--- a/cpp/src/qpid/broker/QueueHandle.h
+++ b/cpp/src/qpid/broker/QueueHandle.h
@@ -21,18 +21,23 @@
* \file QueueHandle.h
*/
-#ifndef qpid_broker_QueueHandleImpl_h_
-#define qpid_broker_QueueHandleImpl_h_
+#ifndef qpid_broker_QueueHandle_h_
+#define qpid_broker_QueueHandle_h_
-#include "IdHandle.h"
+#include "Handle.h"
-#include "qpid/asyncStore/QueueHandleImpl.h"
-#include "qpid/messaging/Handle.h"
+#include "qpid/asyncStore/AsyncStoreHandle.h"
+
+#include <string>
namespace qpid {
+namespace asyncStore {
+class QueueHandleImpl;
+}
namespace broker {
-class QueueHandle : public qpid::messaging::Handle<qpid::asyncStore::QueueHandleImpl>, public IdHandle
+class QueueHandle : public Handle<qpid::asyncStore::QueueHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
{
public:
QueueHandle(qpid::asyncStore::QueueHandleImpl* p = 0);
@@ -44,9 +49,9 @@ public:
const std::string& getName() const;
private:
- friend class qpid::messaging::PrivateImplRef<QueueHandle>;
+ friend class PrivateImplRef<QueueHandle>;
};
}} // namespace qpid::broker
-#endif // qpid_broker_QueueHandleImpl_h_
+#endif // qpid_broker_QueueHandle_h_
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
index e8abe99dab..c3d4342993 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.cpp
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -23,8 +23,6 @@
#include "TxnAsyncContext.h"
-#include <cassert>
-
namespace qpid {
namespace broker {
@@ -38,9 +36,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
m_op(op),
m_rcb(rcb),
m_arq(arq)
-{
- assert(m_th.isValid());
-}
+{}
TxnAsyncContext::~TxnAsyncContext()
{}
@@ -63,7 +59,7 @@ TxnAsyncContext::getOpStr() const
return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
}
-TxnHandle
+TxnHandle&
TxnAsyncContext::getTransactionContext() const
{
return m_th;
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
index 9bdd8f7188..810c46429c 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.h
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -25,15 +25,16 @@
#define qpid_broker_TxnAsyncContext_h_
#include "AsyncStore.h" // qpid::broker::BrokerAsyncContext
-#include "TxnHandle.h"
#include "qpid/asyncStore/AsyncOperation.h"
-#include <boost/shared_ptr.hpp>
-
namespace qpid {
namespace broker {
+class TxnHandle;
+
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
class TxnAsyncContext: public BrokerAsyncContext
{
public:
@@ -46,7 +47,7 @@ public:
TxnBuffer* getTxnBuffer() const;
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- TxnHandle getTransactionContext() const;
+ TxnHandle& getTransactionContext() const;
// --- Interface BrokerAsyncContext ---
AsyncResultQueue* getAsyncResultQueue() const;
@@ -54,7 +55,7 @@ public:
private:
TxnBuffer* const m_tb;
- TxnHandle m_th;
+ TxnHandle& m_th;
const qpid::asyncStore::AsyncOperation::opCode m_op;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index b975f09448..425d725e9e 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -24,13 +24,10 @@
#include "TxnBuffer.h"
#include "AsyncResultHandle.h"
-#include "AsyncStore.h"
#include "TxnAsyncContext.h"
#include "TxnOp.h"
-#include "qpid/Exception.h"
-
-#include <boost/shared_ptr.hpp>
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -47,7 +44,6 @@ TxnBuffer::~TxnBuffer()
void
TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
{
-//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
m_ops.push_back(op);
}
@@ -55,7 +51,6 @@ TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
bool
TxnBuffer::prepare(TxnHandle& th)
{
-//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
if (!(*i)->prepare(th)) {
@@ -68,7 +63,6 @@ TxnBuffer::prepare(TxnHandle& th)
void
TxnBuffer::commit()
{
-//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->commit();
@@ -79,7 +73,6 @@ TxnBuffer::commit()
void
TxnBuffer::rollback()
{
-//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->rollback();
@@ -88,17 +81,16 @@ TxnBuffer::rollback()
}
bool
-TxnBuffer::commitLocal(AsyncTransactionalStore* const store)
+TxnBuffer::commitLocal(AsyncTransaction* const store)
{
-//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush;
if (store) {
try {
m_store = store;
asyncLocalCommit();
} catch (std::exception& e) {
- std::cerr << "Commit failed: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what());
} catch (...) {
- std::cerr << "Commit failed (unknown exception)" << std::endl;
+ QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)");
}
}
return false;
@@ -111,11 +103,10 @@ TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
- std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
tac->getTxnBuffer()->asyncLocalAbort();
} else {
-//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush;
if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
tac->getTxnBuffer()->asyncLocalAbort();
} else {
@@ -131,13 +122,11 @@ TxnBuffer::asyncLocalCommit()
assert(m_store != 0);
switch(m_state) {
case NONE:
-//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush;
m_state = PREPARE;
m_txnHandle = m_store->createTxnHandle(this);
prepare(m_txnHandle);
break;
case PREPARE:
-//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush;
m_state = COMMIT;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
@@ -149,16 +138,12 @@ TxnBuffer::asyncLocalCommit()
}
break;
case COMMIT:
-//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush;
commit();
m_state = COMPLETE;
delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
break;
-// case COMPLETE:
-//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl << std::flush;
- break;
+ case COMPLETE:
default: ;
-//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush;
}
}
@@ -170,7 +155,6 @@ TxnBuffer::asyncLocalAbort()
case NONE:
case PREPARE:
case COMMIT:
-//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush;
m_state = ROLLBACK;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
@@ -182,31 +166,11 @@ TxnBuffer::asyncLocalAbort()
}
break;
case ROLLBACK:
-//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush;
rollback();
m_state = COMPLETE;
delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
default: ;
-//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush;
- }
-}
-
-// for debugging
-/*
-void
-TxnBuffer::printState(std::ostream& os)
-{
- os << "state=";
- switch(m_state) {
- case NONE: os << "NONE"; break;
- case PREPARE: os << "PREPARE"; break;
- case COMMIT: os << "COMMIT"; break;
- case ROLLBACK: os << "ROLLBACK"; break;
- case COMPLETE: os << "COMPLETE"; break;
- default: os << m_state << "(unknown)";
}
- os << "; " << m_ops.size() << "; store=" << m_store;
}
-*/
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
index ea8b73407f..cd78846b71 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -26,7 +26,8 @@
#include "TxnHandle.h"
-//#include <boost/enable_shared_from_this.hpp>
+#include "qpid/sys/Mutex.h"
+
#include <boost/shared_ptr.hpp>
#include <vector>
@@ -35,10 +36,10 @@ namespace broker {
class AsyncResultHandle;
class AsyncResultQueue;
-class AsyncTransactionalStore;
+class AsyncTransaction;
class TxnOp;
-class TxnBuffer /*: public boost::enable_shared_from_this<TxnBuffer>*/ {
+class TxnBuffer {
public:
TxnBuffer(AsyncResultQueue& arq);
virtual ~TxnBuffer();
@@ -47,21 +48,18 @@ public:
bool prepare(TxnHandle& th);
void commit();
void rollback();
- bool commitLocal(AsyncTransactionalStore* const store);
+ bool commitLocal(AsyncTransaction* const store);
// --- Async operations ---
static void handleAsyncResult(const AsyncResultHandle* const arh);
void asyncLocalCommit();
void asyncLocalAbort();
- // --- Debug ---
- //void printState(std::ostream& os);
-
private:
std::vector<boost::shared_ptr<TxnOp> > m_ops;
qpid::sys::Mutex m_opsMutex;
TxnHandle m_txnHandle;
- AsyncTransactionalStore* m_store;
+ AsyncTransaction* m_store;
AsyncResultQueue& m_resultQueue;
typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState;
diff --git a/cpp/src/qpid/broker/TxnHandle.cpp b/cpp/src/qpid/broker/TxnHandle.cpp
index 07d46b4235..58cedd586e 100644
--- a/cpp/src/qpid/broker/TxnHandle.cpp
+++ b/cpp/src/qpid/broker/TxnHandle.cpp
@@ -23,23 +23,23 @@
#include "TxnHandle.h"
-#include "qpid/messaging/PrivateImplRef.h"
+#include "PrivateImplRef.h"
+
+#include "qpid/asyncStore/TxnHandleImpl.h"
namespace qpid {
namespace broker {
-typedef qpid::messaging::PrivateImplRef<TxnHandle> PrivateImpl;
+typedef PrivateImplRef<TxnHandle> PrivateImpl;
TxnHandle::TxnHandle(qpid::asyncStore::TxnHandleImpl* p) :
- qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::TxnHandleImpl>()
{
PrivateImpl::ctor(*this, p);
}
TxnHandle::TxnHandle(const TxnHandle& r) :
- qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>(),
- IdHandle()
+ Handle<qpid::asyncStore::TxnHandleImpl>()
{
PrivateImpl::copy(*this, r);
}
diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h
index 8bed14e16a..7302490939 100644
--- a/cpp/src/qpid/broker/TxnHandle.h
+++ b/cpp/src/qpid/broker/TxnHandle.h
@@ -21,19 +21,23 @@
* \file TxnHandle.h
*/
-#ifndef qpid_broker_TxnHandleImpl_h_
-#define qpid_broker_TxnHandleImpl_h_
+#ifndef qpid_broker_TxnHandle_h_
+#define qpid_broker_TxnHandle_h_
-#include "IdHandle.h"
+#include "Handle.h"
-#include "qpid/asyncStore/TxnHandleImpl.h"
-#include "qpid/messaging/Handle.h"
+#include "qpid/asyncStore/AsyncStoreHandle.h"
+
+#include <string>
namespace qpid {
+namespace asyncStore {
+class TxnHandleImpl;
+}
namespace broker {
-class TxnHandle : public qpid::messaging::Handle<qpid::asyncStore::TxnHandleImpl>,
- public IdHandle
+class TxnHandle : public Handle<qpid::asyncStore::TxnHandleImpl>,
+ public qpid::asyncStore::AsyncStoreHandle
{
public:
TxnHandle(qpid::asyncStore::TxnHandleImpl* p = 0);
@@ -48,9 +52,9 @@ public:
void decrOpCnt();
private:
- friend class qpid::messaging::PrivateImplRef<TxnHandle>;
+ friend class PrivateImplRef<TxnHandle>;
};
}} // namespace qpid::broker
-#endif // qpid_broker_TxnHandleImpl_h_
+#endif // qpid_broker_TxnHandle_h_
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index ad9a518867..637442e128 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -344,78 +344,5 @@ add_library (dlclose_noop MODULE dlclose_noop.c)
#check-long:
# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
-
-# Async Store perf tests
-# ----------------------
-
-# New journal perf test (jrnl2Perf)
-set (jrnl2Perf_SOURCES
- storePerftools/jrnlPerf/Journal.cpp
- storePerftools/jrnlPerf/JournalParameters.cpp
- storePerftools/jrnlPerf/PerfTest.cpp
- storePerftools/jrnlPerf/TestResult.cpp
-
- storePerftools/common/Parameters.cpp
- storePerftools/common/PerftoolError.cpp
- storePerftools/common/ScopedTimable.cpp
- storePerftools/common/ScopedTimer.cpp
- storePerftools/common/Streamable.cpp
- storePerftools/common/TestParameters.cpp
- storePerftools/common/TestResult.cpp
- storePerftools/common/Thread.cpp
-)
-
-if (UNIX)
- add_executable (jrnl2Perf ${jrnl2Perf_SOURCES})
- set_target_properties (jrnl2Perf PROPERTIES
- COMPILE_FLAGS "-DJOURNAL2"
- )
- target_link_libraries (jrnl2Perf
- asyncStore
- qpidbroker
- rt
- )
-endif (UNIX)
-
-# Async store perf test (asyncPerf)
-set (asyncStorePerf_SOURCES
- storePerftools/asyncPerf/Deliverable.cpp
- storePerftools/asyncPerf/DeliveryRecord.cpp
- storePerftools/asyncPerf/MessageAsyncContext.cpp
- storePerftools/asyncPerf/MessageConsumer.cpp
- storePerftools/asyncPerf/MessageDeque.cpp
- storePerftools/asyncPerf/MessageProducer.cpp
- storePerftools/asyncPerf/PerfTest.cpp
- storePerftools/asyncPerf/QueueAsyncContext.cpp
- storePerftools/asyncPerf/QueuedMessage.cpp
- storePerftools/asyncPerf/SimpleMessage.cpp
- storePerftools/asyncPerf/SimpleQueue.cpp
- storePerftools/asyncPerf/TestOptions.cpp
- storePerftools/asyncPerf/TestResult.cpp
- storePerftools/asyncPerf/TxnAccept.cpp
- storePerftools/asyncPerf/TxnPublish.cpp
-
- storePerftools/common/Parameters.cpp
- storePerftools/common/PerftoolError.cpp
- storePerftools/common/ScopedTimable.cpp
- storePerftools/common/ScopedTimer.cpp
- storePerftools/common/Streamable.cpp
- storePerftools/common/TestOptions.cpp
- storePerftools/common/TestResult.cpp
- storePerftools/common/Thread.cpp
-)
-
-if (UNIX)
- add_executable (asyncStorePerf ${asyncStorePerf_SOURCES})
- set_target_properties (asyncStorePerf PROPERTIES
- COMPILE_FLAGS "-DJOURNAL2"
- )
- target_link_libraries (asyncStorePerf
- boost_program_options
- asyncStore
- qpidbroker
- qpidcommon
- qpidtypes
- rt
- )
-endif (UNIX)
+# Include async store tests
+include(asyncstore.cmake)
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake
new file mode 100644
index 0000000000..cd20394908
--- /dev/null
+++ b/cpp/src/tests/asyncstore.cmake
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# Async store test CMake fragment, to be included in tests/CMakeLists.txt
+#
+
+# New journal perf test (jrnl2Perf)
+set (jrnl2Perf_SOURCES
+ storePerftools/jrnlPerf/Journal.cpp
+ storePerftools/jrnlPerf/JournalParameters.cpp
+ storePerftools/jrnlPerf/PerfTest.cpp
+ storePerftools/jrnlPerf/TestResult.cpp
+
+ storePerftools/common/Parameters.cpp
+ storePerftools/common/PerftoolError.cpp
+ storePerftools/common/ScopedTimable.cpp
+ storePerftools/common/ScopedTimer.cpp
+ storePerftools/common/Streamable.cpp
+ storePerftools/common/TestParameters.cpp
+ storePerftools/common/TestResult.cpp
+ storePerftools/common/Thread.cpp
+)
+
+if (UNIX)
+ add_executable (jrnl2Perf ${jrnl2Perf_SOURCES})
+ set_target_properties (jrnl2Perf PROPERTIES
+ COMPILE_FLAGS "-DJOURNAL2"
+ )
+ target_link_libraries (jrnl2Perf
+ asyncStore
+ qpidbroker
+ rt
+ )
+endif (UNIX)
+
+# Async store perf test (asyncPerf)
+set (asyncStorePerf_SOURCES
+ storePerftools/asyncPerf/Deliverable.cpp
+ storePerftools/asyncPerf/DeliveryRecord.cpp
+ storePerftools/asyncPerf/MessageAsyncContext.cpp
+ storePerftools/asyncPerf/MessageConsumer.cpp
+ storePerftools/asyncPerf/MessageDeque.cpp
+ storePerftools/asyncPerf/MessageProducer.cpp
+ storePerftools/asyncPerf/PerfTest.cpp
+ storePerftools/asyncPerf/PersistableQueuedMessage.cpp
+ storePerftools/asyncPerf/QueueAsyncContext.cpp
+ storePerftools/asyncPerf/QueuedMessage.cpp
+ storePerftools/asyncPerf/SimpleMessage.cpp
+ storePerftools/asyncPerf/SimpleQueue.cpp
+ storePerftools/asyncPerf/TestOptions.cpp
+ storePerftools/asyncPerf/TestResult.cpp
+ storePerftools/asyncPerf/TxnAccept.cpp
+ storePerftools/asyncPerf/TxnPublish.cpp
+
+ storePerftools/common/Parameters.cpp
+ storePerftools/common/PerftoolError.cpp
+ storePerftools/common/ScopedTimable.cpp
+ storePerftools/common/ScopedTimer.cpp
+ storePerftools/common/Streamable.cpp
+ storePerftools/common/TestOptions.cpp
+ storePerftools/common/TestResult.cpp
+ storePerftools/common/Thread.cpp
+)
+
+if (UNIX)
+ add_executable (asyncStorePerf ${asyncStorePerf_SOURCES})
+ set_target_properties (asyncStorePerf PROPERTIES
+ COMPILE_FLAGS "-DJOURNAL2"
+ )
+ target_link_libraries (asyncStorePerf
+ boost_program_options
+ asyncStore
+ qpidbroker
+ qpidcommon
+ qpidtypes
+ rt
+ )
+endif (UNIX)
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
index b7250ecf40..1728a2dc1e 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
@@ -31,7 +31,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-DeliveryRecord::DeliveryRecord(const QueuedMessage& qm,
+DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
MessageConsumer& mc,
bool accepted) :
m_queuedMessage(qm),
@@ -47,7 +47,7 @@ bool
DeliveryRecord::accept()
{
if (!m_ended) {
- m_queuedMessage.getQueue()->dequeue(m_queuedMessage);
+ m_queuedMessage->getQueue()->dequeue(m_queuedMessage);
m_accepted = true;
setEnded();
}
@@ -64,7 +64,7 @@ bool
DeliveryRecord::setEnded()
{
m_ended = true;
- m_queuedMessage.payload() = boost::intrusive_ptr<SimpleMessage>(0);
+ m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0);
return isRedundant();
}
@@ -83,7 +83,7 @@ DeliveryRecord::isRedundant() const
void
DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
{
- m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage);
+ m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage);
}
void
@@ -92,7 +92,7 @@ DeliveryRecord::committed() const
m_msgConsumer.commitComplete();
}
-QueuedMessage
+boost::shared_ptr<QueuedMessage>
DeliveryRecord::getQueuedMessage() const
{
return m_queuedMessage;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
index 427cf846f0..bb89787737 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
@@ -26,6 +26,8 @@
#include "QueuedMessage.h"
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
namespace broker {
class TxnHandle;
@@ -39,7 +41,7 @@ class MessageConsumer;
class DeliveryRecord {
public:
- DeliveryRecord(const QueuedMessage& qm,
+ DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
MessageConsumer& mc,
bool accepted);
virtual ~DeliveryRecord();
@@ -50,9 +52,9 @@ public:
bool isRedundant() const;
void dequeue(qpid::broker::TxnHandle& txn);
void committed() const;
- QueuedMessage getQueuedMessage() const;
+ boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
private:
- QueuedMessage m_queuedMessage;
+ boost::shared_ptr<QueuedMessage> m_queuedMessage;
MessageConsumer& m_msgConsumer;
bool m_accepted : 1;
bool m_ended : 1;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
index c61ce352a1..8b79a91ac1 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
@@ -42,7 +42,7 @@ MessageDeque::size()
}
bool
-MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/)
+MessageDeque::push(boost::shared_ptr<QueuedMessage>& added)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
m_messages.push_back(added);
@@ -50,7 +50,7 @@ MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/)
}
bool
-MessageDeque::consume(QueuedMessage& msg)
+MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
if (!m_messages.empty()) {
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
index 75f422779e..021015f3e0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
@@ -46,10 +46,10 @@ public:
MessageDeque();
virtual ~MessageDeque();
uint32_t size();
- bool push(const QueuedMessage& added, QueuedMessage& removed);
- bool consume(QueuedMessage& msg);
+ bool push(boost::shared_ptr<QueuedMessage>& added);
+ bool consume(boost::shared_ptr<QueuedMessage>& msg);
private:
- std::deque<QueuedMessage> m_messages;
+ std::deque<boost::shared_ptr<QueuedMessage> > m_messages;
qpid::sys::Mutex m_msgMutex;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
index 9b5bd0be99..c1bfa328ea 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
@@ -30,6 +30,7 @@
#ifndef tests_storePerftools_asyncPerf_Messages_h_
#define tests_storePerftools_asyncPerf_Messages_h_
+#include <boost/shared_ptr.hpp>
#include <stdint.h>
namespace tests {
@@ -43,8 +44,8 @@ class Messages
public:
virtual ~Messages() {}
virtual uint32_t size() = 0;
- virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
- virtual bool consume(QueuedMessage& msg) = 0;
+ virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0;
+ virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 4d145d321d..1497b678a0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -31,7 +31,10 @@
#include "tests/storePerftools/common/ScopedTimer.h"
#include "tests/storePerftools/common/Thread.h"
+#include "qpid/Modules.h" // Use with loading store as module
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/asyncStore/AsyncStoreOptions.h"
+#include "qpid/broker/AsyncStore.h"
#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -161,16 +164,15 @@ PerfTest::destroyQueues()
}
}
-}}} // namespace tests::storePerftools::asyncPerf
-
-// -----------------------------------------------------------------
-
int
-main(int argc, char** argv)
+runPerfTest(int argc, char** argv)
{
+ // Load async store module
+ qpid::tryShlib ("asyncStore.so", false);
+
qpid::CommonOptions co;
qpid::asyncStore::AsyncStoreOptions aso;
- tests::storePerftools::asyncPerf::TestOptions to;
+ TestOptions to;
qpid::Options opts;
opts.add(co).add(aso).add(to);
try {
@@ -203,5 +205,16 @@ main(int argc, char** argv)
// Print test result
std::cout << apt << std::endl;
//::sleep(1);
+
return 0;
}
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+// -----------------------------------------------------------------
+
+int
+main(int argc, char** argv)
+{
+ return tests::storePerftools::asyncPerf::runPerfTest(argc, argv);
+}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index 6cdf015f76..7cbb71322f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -36,6 +36,9 @@
#include <deque>
namespace qpid {
+namespace broker {
+class AsyncStore;
+}
namespace asyncStore {
class AsyncStoreImpl;
class AsyncStoreOptions;
@@ -83,6 +86,8 @@ private:
};
+int runPerfTest(int argc, char** argv);
+
}}} // namespace tests::storePerftools::asyncPerf
#endif // tests_storePerftools_asyncPerf_PerfTest_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
new file mode 100644
index 0000000000..2eba7d5be5
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp
@@ -0,0 +1,52 @@
+#include "PersistableQueuedMessage.h"
+
+#include "SimpleQueue.h"
+#include "SimpleMessage.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+PersistableQueuedMessage::PersistableQueuedMessage()
+{}
+
+PersistableQueuedMessage::PersistableQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg) :
+ QueuedMessage(q, msg),
+ m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()))
+{}
+
+PersistableQueuedMessage::PersistableQueuedMessage(const PersistableQueuedMessage& pm) :
+ QueuedMessage(pm),
+ m_enqHandle(pm.m_enqHandle)
+{}
+
+PersistableQueuedMessage::PersistableQueuedMessage(PersistableQueuedMessage* const pm) :
+ QueuedMessage(pm),
+ m_enqHandle(pm->m_enqHandle)
+{}
+
+PersistableQueuedMessage::~PersistableQueuedMessage()
+{}
+
+PersistableQueuedMessage&
+PersistableQueuedMessage::operator=(const PersistableQueuedMessage& rhs)
+{
+ QueuedMessage::operator=(rhs);
+ m_enqHandle = rhs.m_enqHandle;
+ return *this;
+}
+
+const qpid::broker::EnqueueHandle&
+PersistableQueuedMessage::enqHandle() const
+{
+ return m_enqHandle;
+}
+
+qpid::broker::EnqueueHandle&
+PersistableQueuedMessage::enqHandle()
+{
+ return m_enqHandle;
+}
+
+}}} // tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
new file mode 100644
index 0000000000..1e9446aa57
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h
@@ -0,0 +1,31 @@
+#ifndef tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
+#define tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
+
+#include "QueuedMessage.h"
+
+#include "qpid/broker/EnqueueHandle.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class PersistableQueuedMessage : public QueuedMessage {
+public:
+ PersistableQueuedMessage();
+ PersistableQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg);
+ PersistableQueuedMessage(const PersistableQueuedMessage& pqm);
+ PersistableQueuedMessage(PersistableQueuedMessage* const pqm);
+ virtual ~PersistableQueuedMessage();
+ PersistableQueuedMessage& operator=(const PersistableQueuedMessage& rhs);
+
+ const qpid::broker::EnqueueHandle& enqHandle() const;
+ qpid::broker::EnqueueHandle& enqHandle();
+
+private:
+ qpid::broker::EnqueueHandle m_enqHandle;
+};
+
+}}} // tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index 112a5ab1dd..3a8850c699 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -31,6 +31,10 @@
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
+namespace qpid {
+namespace broker {
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+}}
namespace tests {
namespace storePerftools {
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 11af7c9466..a733a96171 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -38,28 +38,25 @@ QueuedMessage::QueuedMessage() :
QueuedMessage::QueuedMessage(SimpleQueue* q,
boost::intrusive_ptr<SimpleMessage> msg) :
+ boost::enable_shared_from_this<QueuedMessage>(),
m_queue(q),
- m_msg(msg),
- m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0))
+ m_msg(msg)
{}
QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
+ boost::enable_shared_from_this<QueuedMessage>(),
m_queue(qm.m_queue),
- m_msg(qm.m_msg),
- m_enqHandle(qm.m_enqHandle)
+ m_msg(qm.m_msg)
{}
-QueuedMessage::~QueuedMessage()
+QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
+ boost::enable_shared_from_this<QueuedMessage>(),
+ m_queue(qm->m_queue),
+ m_msg(qm->m_msg)
{}
-QueuedMessage&
-QueuedMessage::operator=(const QueuedMessage& rhs)
-{
- m_queue = rhs.m_queue;
- m_msg = rhs.m_msg;
- m_enqHandle = rhs.m_enqHandle;
- return *this;
-}
+QueuedMessage::~QueuedMessage()
+{}
SimpleQueue*
QueuedMessage::getQueue() const
@@ -73,22 +70,10 @@ QueuedMessage::payload() const
return m_msg;
}
-const qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle() const
-{
- return m_enqHandle;
-}
-
-qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle()
-{
- return m_enqHandle;
-}
-
void
QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
{
- m_queue->enqueue(th, *this);
+ m_queue->enqueue(th, shared_from_this());
}
void
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index 12c8e4da08..7d4e5bbbe4 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -24,8 +24,9 @@
#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_
#define tests_storePerftools_asyncPerf_QueuedMessage_h_
-#include "qpid/broker/EnqueueHandle.h"
+#include "qpid/broker/AsyncStore.h"
+#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -42,19 +43,17 @@ namespace asyncPerf {
class SimpleMessage;
class SimpleQueue;
-class QueuedMessage
+class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage>
{
public:
QueuedMessage();
QueuedMessage(SimpleQueue* q,
boost::intrusive_ptr<SimpleMessage> msg);
QueuedMessage(const QueuedMessage& qm);
- ~QueuedMessage();
- QueuedMessage& operator=(const QueuedMessage& rhs);
+ QueuedMessage(QueuedMessage* const qm);
+ virtual ~QueuedMessage();
SimpleQueue* getQueue() const;
boost::intrusive_ptr<SimpleMessage> payload() const;
- const qpid::broker::EnqueueHandle& enqHandle() const;
- qpid::broker::EnqueueHandle& enqHandle();
// -- Transaction handling ---
void prepareEnqueue(qpid::broker::TxnHandle& th);
@@ -64,7 +63,6 @@ public:
private:
SimpleQueue* m_queue;
boost::intrusive_ptr<SimpleMessage> m_msg;
- qpid::broker::EnqueueHandle m_enqHandle;
};
}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
index 29db6ceaf2..889f7a4cdd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
@@ -30,11 +30,20 @@ namespace storePerftools {
namespace asyncPerf {
SimpleMessage::SimpleMessage(const char* msgData,
+ const uint32_t msgSize) :
+ m_persistenceId(0ULL),
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_store(0),
+ m_msgHandle(qpid::broker::MessageHandle())
+{}
+
+SimpleMessage::SimpleMessage(const char* msgData,
const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store) :
+ qpid::broker::AsyncStore* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
- m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0))
+ m_store(store),
+ m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle())
{}
SimpleMessage::~SimpleMessage()
@@ -95,7 +104,7 @@ SimpleMessage::encodedHeaderSize() const
bool
SimpleMessage::isPersistent() const
{
- return m_msgHandle.isValid();
+ return m_store != 0;
}
uint64_t
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
index 1b3e034814..01f54c1c19 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
@@ -28,13 +28,6 @@
#include "qpid/broker/MessageHandle.h"
#include "qpid/broker/PersistableMessage.h"
-#include <set>
-
-namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}}
-
namespace tests {
namespace storePerftools {
namespace asyncPerf {
@@ -46,8 +39,10 @@ class SimpleMessage: public qpid::broker::PersistableMessage,
{
public:
SimpleMessage(const char* msgData,
+ const uint32_t msgSize);
+ SimpleMessage(const char* msgData,
const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store);
+ qpid::broker::AsyncStore* store);
virtual ~SimpleMessage();
const qpid::broker::MessageHandle& getHandle() const;
qpid::broker::MessageHandle& getHandle();
@@ -71,6 +66,8 @@ public:
private:
mutable uint64_t m_persistenceId;
const std::string m_msg;
+ qpid::broker::AsyncStore* m_store;
+
qpid::broker::MessageHandle m_msgHandle;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 8bb79367ed..3bce2fb52a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -26,13 +26,15 @@
#include "DeliveryRecord.h"
#include "MessageConsumer.h"
#include "MessageDeque.h"
-#include "SimpleMessage.h"
+#include "PersistableQueuedMessage.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
+#include "SimpleMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/AsyncResultHandle.h"
-#include "qpid/broker/TxnHandle.h"
+
+#include <boost/make_shared.hpp>
namespace tests {
namespace storePerftools {
@@ -44,7 +46,7 @@ qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operat
SimpleQueue::SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq) :
qpid::broker::PersistableQueue(),
m_name(name),
@@ -117,7 +119,7 @@ SimpleQueue::getHandle()
return m_queueHandle;
}
-qpid::asyncStore::AsyncStoreImpl*
+qpid::broker::AsyncStore*
SimpleQueue::getStore()
{
return m_store;
@@ -161,15 +163,24 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
- QueuedMessage qm(this, msg);
+ boost::shared_ptr<QueuedMessage> qm;
+ if (msg->isPersistent() && m_store) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ }
+//boost::shared_ptr<PersistableQueuedMessage> pqm1 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm1.get());
enqueue(s_nullTxnHandle, qm);
+//boost::shared_ptr<PersistableQueuedMessage> pqm2 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm2.get());
push(qm);
}
bool
SimpleQueue::dispatch(MessageConsumer& mc)
{
- QueuedMessage qm;
+ boost::shared_ptr<QueuedMessage> qm;
if (m_messages->consume(qm)) {
boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
mc.record(dr);
@@ -179,43 +190,47 @@ SimpleQueue::dispatch(MessageConsumer& mc)
}
bool
-SimpleQueue::enqueue(QueuedMessage& qm)
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
{
return enqueue(s_nullTxnHandle, qm);
}
bool
SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<QueuedMessage> qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
- if (qm.payload()->isPersistent() && m_store) {
- qm.payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, qm);
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->enqueueAsync(shared_from_this(), m_store);
+ return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
}
return false;
}
bool
-SimpleQueue::dequeue(QueuedMessage& qm)
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
{
return dequeue(s_nullTxnHandle, qm);
}
bool
SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<QueuedMessage> qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
- if (qm.payload()->isPersistent() && m_store) {
- qm.payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, qm);
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->dequeueAsync(shared_from_this(), m_store);
+//assert(qm.get());
+//boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm.get());
+//return asyncDequeue(th, pqm);
+ return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
}
return true;
}
@@ -223,7 +238,12 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
- QueuedMessage qm(this, msg);
+ boost::shared_ptr<QueuedMessage> qm;
+ if (msg->isPersistent() && m_store) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ }
push(qm);
}
@@ -343,11 +363,13 @@ SimpleQueue::ScopedUse::~ScopedUse()
// private
void
-SimpleQueue::push(QueuedMessage& qm,
+SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
bool /*isRecovery*/)
{
- QueuedMessage removed;
- m_messages->push(qm, removed);
+boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+assert(pqm.get());
+
+ m_messages->push(qm);
}
// --- End Members & methods in msg handling path from qpid::Queue ---
@@ -355,20 +377,22 @@ SimpleQueue::push(QueuedMessage& qm,
// private
bool
SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<PersistableQueuedMessage> pqm)
{
- qm.payload()->setPersistenceId(m_store->getNextRid());
-//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(pqm.get());
+// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << pqm->payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm.payload(),
+ pqm->payload(),
th,
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ // TODO : This must be done from inside store, not here
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(qm.enqHandle(),
+ m_store->submitEnqueue(pqm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
@@ -378,19 +402,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<PersistableQueuedMessage> pqm)
{
+ assert(pqm.get());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm.payload(),
+ pqm->payload(),
th,
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ // TODO : This must be done from inside store, not here
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitDequeue(qm.enqHandle(),
+ m_store->submitDequeue(pqm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
@@ -445,6 +471,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
--m_asyncOpCounter;
qpid::broker::TxnHandle th = qc->getTxnHandle();
+
+ // TODO : This must be done from inside store, not here
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
@@ -459,6 +487,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
--m_asyncOpCounter;
qpid::broker::TxnHandle th = qc->getTxnHandle();
+
+ // TODO : This must be done from inside store, not here
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index 59e12b5c93..81ea8b022b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -40,7 +40,6 @@ class AsyncStoreImpl;
}
namespace broker {
class AsyncResultQueue;
-class TxnHandle;
}
namespace framing {
class FieldTable;
@@ -52,9 +51,10 @@ namespace asyncPerf {
class MessageConsumer;
class Messages;
-class SimpleMessage;
+class PersistableQueuedMessage;
class QueueAsyncContext;
class QueuedMessage;
+class SimpleMessage;
class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
public qpid::broker::PersistableQueue,
@@ -63,14 +63,14 @@ class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
public:
SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq);
virtual ~SimpleQueue();
static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
- qpid::asyncStore::AsyncStoreImpl* getStore();
+ qpid::broker::AsyncStore* getStore();
void asyncCreate();
void asyncDestroy(const bool deleteQueue);
@@ -78,12 +78,12 @@ public:
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
bool dispatch(MessageConsumer& mc);
- bool enqueue(QueuedMessage& qm);
+ bool enqueue(boost::shared_ptr<QueuedMessage> qm);
bool enqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
- bool dequeue(QueuedMessage& qm);
+ boost::shared_ptr<QueuedMessage> qm);
+ bool dequeue(boost::shared_ptr<QueuedMessage> qm);
bool dequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
+ boost::shared_ptr<QueuedMessage> qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
@@ -106,9 +106,9 @@ private:
static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations
const std::string m_name;
- qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncStore* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- qpid::asyncStore::AsyncOpCounter m_asyncOpCounter;
+ qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter!
mutable uint64_t m_persistenceId;
std::string m_persistableData;
qpid::broker::QueueHandle m_queueHandle;
@@ -135,14 +135,14 @@ private:
};
UsageBarrier m_barrier;
std::auto_ptr<Messages> m_messages;
- void push(QueuedMessage& qm,
+ void push(boost::shared_ptr<QueuedMessage> qm,
bool isRecovery = false);
// -- Async ops ---
bool asyncEnqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
+ boost::shared_ptr<PersistableQueuedMessage> pqm);
bool asyncDequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm);
+ boost::shared_ptr<PersistableQueuedMessage> pqm);
// --- Async op counter ---
void destroyCheck(const std::string& opDescr) const;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
index 3c5b99b5d5..7bede50272 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
@@ -25,6 +25,8 @@
#include "DeliveryRecord.h"
+#include "qpid/log/Statement.h"
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
@@ -41,15 +43,14 @@ TxnAccept::~TxnAccept()
bool
TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
{
-//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush;
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->dequeue(th);
}
} catch (const std::exception& e) {
- std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnAccept: Failed to prepare transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)");
}
return false;
}
@@ -57,23 +58,20 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
void
TxnAccept::commit() throw()
{
-//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush;
try {
for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
(*i)->committed();
(*i)->setEnded();
}
} catch (const std::exception& e) {
- std::cerr << "TxnAccept: Failed to commit transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what());
} catch(...) {
- std::cerr << "TxnAccept: Failed to commit transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)");
}
}
void
TxnAccept::rollback() throw()
-{
-//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush;
-}
+{}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 10e48bef82..0c34520d06 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -21,11 +21,16 @@
* \file TxnPublish.cpp
*/
-#include "SimpleMessage.h"
-#include "SimpleQueue.h" // debug msg
#include "TxnPublish.h"
+#include "PersistableQueuedMessage.h"
#include "QueuedMessage.h"
+#include "SimpleMessage.h"
+#include "SimpleQueue.h" // debug msg
+
+#include "qpid/log/Statement.h"
+
+#include <boost/make_shared.hpp>
namespace tests {
namespace storePerftools {
@@ -33,9 +38,7 @@ namespace asyncPerf {
TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
-{
-//std::cout << "TTT new TxnPublish" << std::endl << std::flush;
-}
+{}
TxnPublish::~TxnPublish()
{}
@@ -43,7 +46,6 @@ TxnPublish::~TxnPublish()
bool
TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
{
-//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush;
try{
while (!m_queues.empty()) {
m_queues.front()->prepareEnqueue(th);
@@ -52,9 +54,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
}
return true;
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)");
}
return false;
}
@@ -62,30 +64,28 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
void
TxnPublish::commit() throw()
{
-//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush;
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->commitEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)");
}
}
void
TxnPublish::rollback() throw()
{
-//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush;
try {
for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->abortEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what());
} catch (...) {
- std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl;
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)");
}
}
@@ -98,8 +98,12 @@ TxnPublish::contentSize()
void
TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
{
-//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush;
- boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg));
+ boost::shared_ptr<QueuedMessage> qm;
+ if (m_msg->isPersistent() && queue->getStore()) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(queue.get(), m_msg));
+ }
m_queues.push_back(qm);
m_delivered = true;
}