summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-01 14:05:21 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-01 14:05:21 +0000
commit80bfab9ed823cebd9f8f58b559fd32df108bcf7d (patch)
tree191bf724b9bf5b8394343d60ac4eac804e9c3d3a /cpp/src/qpid/broker
parent63c6598f401ac6406e5a31c602c7892b798536fc (diff)
downloadqpid-python-80bfab9ed823cebd9f8f58b559fd32df108bcf7d.tar.gz
QPID-3858: WIP: Moving Simple* test classes into the correct namespaces so as to correspond with broker classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368006 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h8
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.h8
-rw-r--r--cpp/src/qpid/broker/SimpleConsumer.h42
-rw-r--r--cpp/src/qpid/broker/SimpleDeliverable.cpp40
-rw-r--r--cpp/src/qpid/broker/SimpleDeliverable.h53
-rw-r--r--cpp/src/qpid/broker/SimpleDeliveryRecord.cpp92
-rw-r--r--cpp/src/qpid/broker/SimpleDeliveryRecord.h59
-rw-r--r--cpp/src/qpid/broker/SimpleMessage.cpp108
-rw-r--r--cpp/src/qpid/broker/SimpleMessage.h73
-rw-r--r--cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp59
-rw-r--r--cpp/src/qpid/broker/SimpleMessageAsyncContext.h55
-rw-r--r--cpp/src/qpid/broker/SimpleMessageDeque.cpp59
-rw-r--r--cpp/src/qpid/broker/SimpleMessageDeque.h57
-rw-r--r--cpp/src/qpid/broker/SimpleMessages.h52
-rw-r--r--cpp/src/qpid/broker/SimpleQueue.cpp448
-rw-r--r--cpp/src/qpid/broker/SimpleQueue.h155
-rw-r--r--cpp/src/qpid/broker/SimpleQueuedMessage.cpp98
-rw-r--r--cpp/src/qpid/broker/SimpleQueuedMessage.h66
-rw-r--r--cpp/src/qpid/broker/SimpleTxnAccept.cpp73
-rw-r--r--cpp/src/qpid/broker/SimpleTxnAccept.h52
-rw-r--r--cpp/src/qpid/broker/SimpleTxnBuffer.cpp (renamed from cpp/src/qpid/broker/TxnBuffer.cpp)52
-rw-r--r--cpp/src/qpid/broker/SimpleTxnBuffer.h (renamed from cpp/src/qpid/broker/TxnBuffer.h)22
-rw-r--r--cpp/src/qpid/broker/SimpleTxnOp.h (renamed from cpp/src/qpid/broker/TxnOp.h)16
-rw-r--r--cpp/src/qpid/broker/SimpleTxnPublish.cpp101
-rw-r--r--cpp/src/qpid/broker/SimpleTxnPublish.h67
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp4
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h12
28 files changed, 1871 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 6f1c02e059..7009565a7c 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -70,19 +70,19 @@ class TxnHandle;
class QueueAsyncContext;
class TpcTxnAsyncContext;
class TxnAsyncContext;
-class TxnBuffer;
+class SimpleTxnBuffer;
class AsyncTransactionalStore {
public:
virtual ~AsyncTransactionalStore() {}
virtual TxnHandle createTxnHandle() = 0;
- virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
+ virtual TxnHandle createTxnHandle(SimpleTxnBuffer* tb) = 0;
virtual TxnHandle createTxnHandle(const std::string& xid,
const bool tpcFlag) = 0;
virtual TxnHandle createTxnHandle(const std::string& xid,
const bool tpcFlag,
- TxnBuffer* tb) = 0;
+ SimpleTxnBuffer* tb) = 0;
virtual void submitPrepare(TxnHandle&,
boost::shared_ptr<TpcTxnAsyncContext>) = 0; // Distributed txns only
@@ -94,7 +94,7 @@ public:
};
// Subclassed by store:
-class AsyncStore {
+class AsyncStore : public AsyncTransactionalStore {
public:
virtual ~AsyncStore() {}
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp
index 4bd2d271eb..02eb2e9546 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.cpp
+++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp
@@ -48,7 +48,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
{}
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
@@ -61,7 +61,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
@@ -89,7 +89,7 @@ QueueAsyncContext::getMessage() const
return m_msg;
}
-TxnBuffer*
+SimpleTxnBuffer*
QueueAsyncContext::getTxnBuffer() const {
return m_tb;
}
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h
index e9ba2ebbac..8657922377 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.h
+++ b/cpp/src/qpid/broker/QueueAsyncContext.h
@@ -52,18 +52,18 @@ public:
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
boost::shared_ptr<PersistableQueue> getQueue() const;
boost::intrusive_ptr<PersistableMessage> getMessage() const;
- TxnBuffer* getTxnBuffer() const;
+ SimpleTxnBuffer* getTxnBuffer() const;
AsyncResultQueue* getAsyncResultQueue() const;
AsyncResultCallback getAsyncResultCallback() const;
void invokeCallback(const AsyncResultHandle* const arh) const;
@@ -72,7 +72,7 @@ public:
private:
boost::shared_ptr<PersistableQueue> m_q;
boost::intrusive_ptr<PersistableMessage> m_msg;
- TxnBuffer* m_tb;
+ SimpleTxnBuffer* m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/qpid/broker/SimpleConsumer.h b/cpp/src/qpid/broker/SimpleConsumer.h
new file mode 100644
index 0000000000..6601c65a42
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleConsumer.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleConsumer.h
+ */
+
+#ifndef qpid_broker_SimpleConsumer_h_
+#define qpid_broker_SimpleConsumer_h_
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class SimpleDeliveryRecord;
+
+class SimpleConsumer {
+public:
+ virtual ~SimpleConsumer() {}
+ virtual void commitComplete() = 0;
+ virtual void record(boost::shared_ptr<SimpleDeliveryRecord> dr) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleConsumer_h_
diff --git a/cpp/src/qpid/broker/SimpleDeliverable.cpp b/cpp/src/qpid/broker/SimpleDeliverable.cpp
new file mode 100644
index 0000000000..7037a377c5
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleDeliverable.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleDeliverable.cpp
+ */
+
+#include "SimpleDeliverable.h"
+
+namespace qpid {
+namespace broker {
+
+SimpleDeliverable::SimpleDeliverable() :
+ m_delivered(false)
+{}
+
+SimpleDeliverable::~SimpleDeliverable() {}
+
+bool
+SimpleDeliverable::isDelivered() const {
+ return m_delivered;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleDeliverable.h b/cpp/src/qpid/broker/SimpleDeliverable.h
new file mode 100644
index 0000000000..6441e14841
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleDeliverable.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleDeliverable.h
+ */
+
+#ifndef qpid_broker_SimpleDeliverable_h_
+#define qpid_broker_SimpleDeliverable_h_
+
+#include <boost/shared_ptr.hpp>
+#include <stdint.h> // uint64_t
+
+namespace qpid {
+namespace broker {
+
+class SimpleMessage;
+class SimpleQueue;
+
+class SimpleDeliverable
+{
+public:
+ SimpleDeliverable();
+ virtual ~SimpleDeliverable();
+
+ virtual uint64_t contentSize() = 0;
+ virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0;
+ virtual SimpleMessage& getMessage() = 0;
+ virtual bool isDelivered() const;
+
+protected:
+ bool m_delivered;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleDeliverable_h_
diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp
new file mode 100644
index 0000000000..b71df6975b
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleDeliveryRecord.cpp
+ */
+
+#include "SimpleDeliveryRecord.h"
+
+#include "SimpleConsumer.h"
+#include "SimpleMessage.h"
+#include "SimpleQueue.h"
+#include "SimpleQueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+SimpleDeliveryRecord::SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm,
+ SimpleConsumer& sc,
+ bool accepted) :
+ m_queuedMessage(qm),
+ m_msgConsumer(sc),
+ m_accepted(accepted),
+ m_ended(accepted)
+{}
+
+SimpleDeliveryRecord::~SimpleDeliveryRecord() {}
+
+bool
+SimpleDeliveryRecord::accept() {
+ if (!m_ended) {
+ m_queuedMessage->getQueue()->dequeue(m_queuedMessage);
+ m_accepted = true;
+ setEnded();
+ }
+ return isRedundant();
+}
+
+bool
+SimpleDeliveryRecord::isAccepted() const {
+ return m_accepted;
+}
+
+bool
+SimpleDeliveryRecord::setEnded() {
+ m_ended = true;
+ m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0);
+ return isRedundant();
+}
+
+bool
+SimpleDeliveryRecord::isEnded() const {
+ return m_ended;
+}
+
+bool
+SimpleDeliveryRecord::isRedundant() const {
+ return m_ended;
+}
+
+void
+SimpleDeliveryRecord::dequeue(qpid::broker::SimpleTxnBuffer* tb) {
+ m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage);
+}
+
+void
+SimpleDeliveryRecord::committed() const {
+ m_msgConsumer.commitComplete();
+}
+
+boost::shared_ptr<SimpleQueuedMessage>
+SimpleDeliveryRecord::getQueuedMessage() const {
+ return m_queuedMessage;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.h b/cpp/src/qpid/broker/SimpleDeliveryRecord.h
new file mode 100644
index 0000000000..622ce578d7
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleDeliveryRecord.h
+ */
+
+#ifndef qpid_broker_SimpleDeliveryRecord_h_
+#define qpid_broker_SimpleDeliveryRecord_h_
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SimpleConsumer;
+class SimpleQueuedMessage;
+class SimpleTxnBuffer;
+
+class SimpleDeliveryRecord {
+public:
+ SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm,
+ SimpleConsumer& sc,
+ bool accepted);
+ virtual ~SimpleDeliveryRecord();
+ bool accept();
+ bool isAccepted() const;
+ bool setEnded();
+ bool isEnded() const;
+ bool isRedundant() const;
+ void dequeue(qpid::broker::SimpleTxnBuffer* tb);
+ void committed() const;
+ boost::shared_ptr<SimpleQueuedMessage> getQueuedMessage() const;
+private:
+ boost::shared_ptr<SimpleQueuedMessage> m_queuedMessage;
+ SimpleConsumer& m_msgConsumer;
+ bool m_accepted : 1;
+ bool m_ended : 1;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleDeliveryRecord_h_
diff --git a/cpp/src/qpid/broker/SimpleMessage.cpp b/cpp/src/qpid/broker/SimpleMessage.cpp
new file mode 100644
index 0000000000..1239533edf
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessage.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessage.cpp
+ */
+
+#include "SimpleMessage.h"
+
+#include <string.h> // memcpy()
+
+namespace qpid {
+namespace broker {
+
+SimpleMessage::SimpleMessage(const char* msgData,
+ const uint32_t msgSize) :
+ m_persistenceId(0ULL),
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_store(0),
+ m_msgHandle(MessageHandle())
+{}
+
+SimpleMessage::SimpleMessage(const char* msgData,
+ const uint32_t msgSize,
+ AsyncStore* store) :
+ m_persistenceId(0ULL),
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_store(store),
+ m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle())
+{}
+
+SimpleMessage::~SimpleMessage() {}
+
+const MessageHandle&
+SimpleMessage::getHandle() const {
+ return m_msgHandle;
+}
+
+MessageHandle&
+SimpleMessage::getHandle() {
+ return m_msgHandle;
+}
+
+uint64_t
+SimpleMessage::contentSize() const {
+ return static_cast<uint64_t>(m_msg.size());
+}
+
+void
+SimpleMessage::setPersistenceId(uint64_t id) const {
+ m_persistenceId = id;
+}
+
+uint64_t
+SimpleMessage::getPersistenceId() const {
+ return m_persistenceId;
+}
+
+void
+SimpleMessage::encode(qpid::framing::Buffer& buffer) const {
+ buffer.putRawData(m_msg);
+}
+
+uint32_t
+SimpleMessage::encodedSize() const {
+ return static_cast<uint32_t>(m_msg.size());
+}
+
+void
+SimpleMessage::allDequeuesComplete() {}
+
+uint32_t
+SimpleMessage::encodedHeaderSize() const {
+ return 0;
+}
+
+bool
+SimpleMessage::isPersistent() const {
+ return m_store != 0;
+}
+
+uint64_t
+SimpleMessage::getSize() {
+ return m_msg.size();
+}
+
+void
+SimpleMessage::write(char* target) {
+ ::memcpy(target, m_msg.data(), m_msg.size());
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleMessage.h b/cpp/src/qpid/broker/SimpleMessage.h
new file mode 100644
index 0000000000..edfaa8d13b
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessage.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessage.h
+ */
+
+#ifndef qpid_broker_SimpleMessage_h_
+#define qpid_broker_SimpleMessage_h_
+
+#include "AsyncStore.h" // DataSource
+#include "MessageHandle.h"
+#include "PersistableMessage.h"
+
+namespace qpid {
+namespace broker {
+
+class SimpleMessage: public PersistableMessage,
+ public DataSource
+{
+public:
+ SimpleMessage(const char* msgData,
+ const uint32_t msgSize);
+ SimpleMessage(const char* msgData,
+ const uint32_t msgSize,
+ AsyncStore* store);
+ virtual ~SimpleMessage();
+ const MessageHandle& getHandle() const;
+ MessageHandle& getHandle();
+ uint64_t contentSize() const;
+
+ // --- Interface Persistable ---
+ virtual void setPersistenceId(uint64_t id) const;
+ virtual uint64_t getPersistenceId() const;
+ virtual void encode(qpid::framing::Buffer& buffer) const;
+ virtual uint32_t encodedSize() const;
+
+ // --- Interface PersistableMessage ---
+ virtual void allDequeuesComplete();
+ virtual uint32_t encodedHeaderSize() const;
+ virtual bool isPersistent() const;
+
+ // --- Interface DataSource ---
+ virtual uint64_t getSize(); // <- same as encodedSize()?
+ virtual void write(char* target);
+
+private:
+ mutable uint64_t m_persistenceId;
+ const std::string m_msg;
+ AsyncStore* m_store;
+
+ MessageHandle m_msgHandle;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleMessage_h_
diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp
new file mode 100644
index 0000000000..a88258f5bc
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessageAsyncContext.cpp
+ */
+
+#include "SimpleMessageAsyncContext.h"
+
+#include "SimpleMessage.h"
+
+#include <cassert>
+
+namespace qpid {
+namespace broker {
+
+SimpleMessageAsyncContext::SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
+ boost::shared_ptr<SimpleQueue> q) :
+ m_msg(msg),
+ m_q(q)
+{
+ assert(m_msg.get() != 0);
+ assert(m_q.get() != 0);
+}
+
+SimpleMessageAsyncContext::~SimpleMessageAsyncContext() {}
+
+boost::intrusive_ptr<SimpleMessage>
+SimpleMessageAsyncContext::getMessage() const {
+ return m_msg;
+}
+
+boost::shared_ptr<SimpleQueue>
+SimpleMessageAsyncContext::getQueue() const {
+ return m_q;
+}
+
+void
+SimpleMessageAsyncContext::destroy() {
+ delete this;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.h b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h
new file mode 100644
index 0000000000..e3975e790e
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessageAsyncContext.h
+ */
+
+#ifndef qpid_broker_SimpleMessageAsyncContext_h_
+#define qpid_broker_SimpleMessageAsyncContext_h_
+
+#include "AsyncStore.h" // BrokerAsyncContext
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SimpleMessage;
+class SimpleQueue;
+
+class SimpleMessageAsyncContext : public BrokerAsyncContext
+{
+public:
+ SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
+ boost::shared_ptr<SimpleQueue> q);
+ virtual ~SimpleMessageAsyncContext();
+ boost::intrusive_ptr<SimpleMessage> getMessage() const;
+ boost::shared_ptr<SimpleQueue> getQueue() const;
+ void destroy();
+
+private:
+ boost::intrusive_ptr<SimpleMessage> m_msg;
+ boost::shared_ptr<SimpleQueue> m_q;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleMessageAsyncContext_h_
diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.cpp b/cpp/src/qpid/broker/SimpleMessageDeque.cpp
new file mode 100644
index 0000000000..0aadcfd94a
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessageDeque.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessageDeque.cpp
+ */
+
+#include "SimpleMessageDeque.h"
+
+#include "SimpleQueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+SimpleMessageDeque::SimpleMessageDeque() {}
+
+SimpleMessageDeque::~SimpleMessageDeque() {}
+
+uint32_t
+SimpleMessageDeque::size() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
+ return m_messages.size();
+}
+
+bool
+SimpleMessageDeque::push(boost::shared_ptr<SimpleQueuedMessage>& added) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
+ m_messages.push_back(added);
+ return false;
+}
+
+bool
+SimpleMessageDeque::consume(boost::shared_ptr<SimpleQueuedMessage>& msg) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
+ if (!m_messages.empty()) {
+ msg = m_messages.front();
+ m_messages.pop_front();
+ return true;
+ }
+ return false;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.h b/cpp/src/qpid/broker/SimpleMessageDeque.h
new file mode 100644
index 0000000000..5db0755a43
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessageDeque.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessageDeque.h
+ */
+
+/*
+ * This is a copy of qpid::broker::MessageDeque.h, but using the local
+ * SimpleQueuedMessage class instead of QueuedMessage.
+ */
+
+#ifndef qpid_broker_SimpleMessageDeque_h_
+#define qpid_broker_SimpleMessageDeque_h_
+
+#include "SimpleMessages.h"
+
+#include "qpid/sys/Mutex.h"
+
+#include <deque>
+
+namespace qpid {
+namespace broker {
+
+class SimpleMessageDeque : public SimpleMessages
+{
+public:
+ SimpleMessageDeque();
+ virtual ~SimpleMessageDeque();
+ uint32_t size();
+ bool push(boost::shared_ptr<SimpleQueuedMessage>& added);
+ bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg);
+private:
+ std::deque<boost::shared_ptr<SimpleQueuedMessage> > m_messages;
+ qpid::sys::Mutex m_msgMutex;
+
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleMessageDeque_h_
diff --git a/cpp/src/qpid/broker/SimpleMessages.h b/cpp/src/qpid/broker/SimpleMessages.h
new file mode 100644
index 0000000000..2a40859032
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleMessages.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleMessages.h
+ */
+
+/*
+ * This is a copy of qpid::broker::Messages.h, but using the local
+ * tests::storePerftools::asyncPerf::QueuedMessage class instead of
+ * qpid::broker::QueuedMessage.
+ */
+
+#ifndef qpid_broker_SimpleMessages_h_
+#define qpid_broker_SimpleMessages_h_
+
+#include <boost/shared_ptr.hpp>
+#include <stdint.h>
+
+namespace qpid {
+namespace broker {
+
+class SimpleQueuedMessage;
+
+class SimpleMessages
+{
+public:
+ virtual ~SimpleMessages() {}
+ virtual uint32_t size() = 0;
+ virtual bool push(boost::shared_ptr<SimpleQueuedMessage>& added) = 0;
+ virtual bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleMessages_h_
diff --git a/cpp/src/qpid/broker/SimpleQueue.cpp b/cpp/src/qpid/broker/SimpleQueue.cpp
new file mode 100644
index 0000000000..5cd8841f94
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleQueue.cpp
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleQueue.cpp
+ */
+
+#include "SimpleQueue.h"
+
+#include "AsyncResultHandle.h"
+#include "QueueAsyncContext.h"
+#include "SimpleConsumer.h"
+#include "SimpleDeliveryRecord.h"
+#include "SimpleMessage.h"
+#include "SimpleMessageDeque.h"
+#include "SimpleQueuedMessage.h"
+#include "SimpleTxnBuffer.h"
+
+#include <string.h> // memcpy()
+
+namespace qpid {
+namespace broker {
+
+//static
+TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations
+
+
+SimpleQueue::SimpleQueue(const std::string& name,
+ const qpid::framing::FieldTable& /*args*/,
+ AsyncStore* store,
+ AsyncResultQueue& arq) :
+ PersistableQueue(),
+ m_name(name),
+ m_store(store),
+ m_resultQueue(arq),
+ m_asyncOpCounter(0UL),
+ m_persistenceId(0ULL),
+ m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
+ m_destroyPending(false),
+ m_destroyed(false),
+ m_barrier(*this),
+ m_messages(new SimpleMessageDeque())
+{
+ if (m_store != 0) {
+ const qpid::types::Variant::Map qo;
+ m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ }
+}
+
+SimpleQueue::~SimpleQueue() {}
+
+const QueueHandle&
+SimpleQueue::getHandle() const {
+ return m_queueHandle;
+}
+
+QueueHandle&
+SimpleQueue::getHandle() {
+ return m_queueHandle;
+}
+
+AsyncStore*
+SimpleQueue::getStore() {
+ return m_store;
+}
+
+void
+SimpleQueue::asyncCreate() {
+ if (m_store) {
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ &handleAsyncCreateResult,
+ &m_resultQueue));
+ m_store->submitCreate(m_queueHandle, this, qac);
+ ++m_asyncOpCounter;
+ }
+}
+
+//static
+void
+SimpleQueue::handleAsyncCreateResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ sq->createComplete(qc);
+ }
+ }
+}
+
+void
+SimpleQueue::asyncDestroy(const bool deleteQueue)
+{
+ m_destroyPending = true;
+ if (m_store) {
+ if (deleteQueue) {
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ &handleAsyncDestroyResult,
+ &m_resultQueue));
+ m_store->submitDestroy(m_queueHandle, qac);
+ ++m_asyncOpCounter;
+ }
+ m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
+ }
+}
+
+//static
+void
+SimpleQueue::handleAsyncDestroyResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ sq->destroyComplete(qc);
+ }
+ }
+}
+
+void
+SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
+ boost::shared_ptr<SimpleQueuedMessage> qm(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg)));
+ enqueue(qm);
+ push(qm);
+}
+
+bool
+SimpleQueue::dispatch(SimpleConsumer& sc) {
+ boost::shared_ptr<SimpleQueuedMessage> qm;
+ if (m_messages->consume(qm)) {
+ boost::shared_ptr<SimpleDeliveryRecord> dr(new SimpleDeliveryRecord(qm, sc, false));
+ sc.record(dr);
+ return true;
+ }
+ return false;
+}
+
+bool
+SimpleQueue::enqueue(boost::shared_ptr<SimpleQueuedMessage> qm) {
+ return enqueue(0, qm);
+}
+
+bool
+SimpleQueue::enqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
+ ScopedUse u(m_barrier);
+ if (!u.m_acquired) {
+ return false;
+ }
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->enqueueAsync(shared_from_this(), m_store);
+ return asyncEnqueue(tb, qm);
+ }
+ return false;
+}
+
+bool
+SimpleQueue::dequeue(boost::shared_ptr<SimpleQueuedMessage> qm) {
+ return dequeue(0, qm);
+}
+
+bool
+SimpleQueue::dequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
+ ScopedUse u(m_barrier);
+ if (!u.m_acquired) {
+ return false;
+ }
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->dequeueAsync(shared_from_this(), m_store);
+ return asyncDequeue(tb, qm);
+ }
+ return true;
+}
+
+void
+SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
+ push(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg)));
+}
+
+void
+SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {}
+
+void
+SimpleQueue::encode(qpid::framing::Buffer& buffer) const {
+ buffer.putShortString(m_name);
+}
+
+uint32_t
+SimpleQueue::encodedSize() const {
+ return m_name.size() + 1;
+}
+
+uint64_t
+SimpleQueue::getPersistenceId() const {
+ return m_persistenceId;
+}
+
+void
+SimpleQueue::setPersistenceId(uint64_t persistenceId) const {
+ m_persistenceId = persistenceId;
+}
+
+void
+SimpleQueue::flush() {
+ //if(m_store) m_store->flush(*this);
+}
+
+const std::string&
+SimpleQueue::getName() const {
+ return m_name;
+}
+
+void
+SimpleQueue::setExternalQueueStore(ExternalQueueStore* inst) {
+ if (externalQueueStore != inst && externalQueueStore)
+ delete externalQueueStore;
+ externalQueueStore = inst;
+}
+
+uint64_t
+SimpleQueue::getSize() {
+ return m_persistableData.size();
+}
+
+void
+SimpleQueue::write(char* target) {
+ ::memcpy(target, m_persistableData.data(), m_persistableData.size());
+}
+
+// --- Members & methods in msg handling path from qpid::Queue ---
+
+// protected
+SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) :
+ m_parent(q),
+ m_count(0)
+{}
+
+// protected
+bool
+SimpleQueue::UsageBarrier::acquire() {
+ qpid::sys::Monitor::ScopedLock l(m_monitor);
+ if (m_parent.m_destroyed) {
+ return false;
+ } else {
+ ++m_count;
+ return true;
+ }
+}
+
+// protected
+void SimpleQueue::UsageBarrier::release() {
+ qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
+ if (--m_count == 0) {
+ m_monitor.notifyAll();
+ }
+}
+
+// protected
+void SimpleQueue::UsageBarrier::destroy() {
+ qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
+ m_parent.m_destroyed = true;
+ while (m_count) {
+ m_monitor.wait();
+ }
+}
+
+// protected
+SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
+ m_barrier(b),
+ m_acquired(m_barrier.acquire())
+{}
+
+// protected
+SimpleQueue::ScopedUse::~ScopedUse() {
+ if (m_acquired) {
+ m_barrier.release();
+ }
+}
+
+// private
+void
+SimpleQueue::push(boost::shared_ptr<SimpleQueuedMessage> qm,
+ bool /*isRecovery*/) {
+ m_messages->push(qm);
+}
+
+// --- End Members & methods in msg handling path from qpid::Queue ---
+
+// private
+bool
+SimpleQueue::asyncEnqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
+ assert(qm.get());
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qm->payload(),
+ tb,
+ &handleAsyncEnqueueResult,
+ &m_resultQueue));
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac);
+ }
+ ++m_asyncOpCounter;
+ return true;
+}
+
+// private static
+void
+SimpleQueue::handleAsyncEnqueueResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ sq->enqueueComplete(qc);
+ }
+ }
+}
+
+// private
+bool
+SimpleQueue::asyncDequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
+ assert(qm.get());
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qm->payload(),
+ tb,
+ &handleAsyncDequeueResult,
+ &m_resultQueue));
+ if (tb) {
+ tb->incrOpCnt();
+ m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
+ } else {
+ m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac);
+ }
+ ++m_asyncOpCounter;
+ return true;
+}
+
+// private static
+void
+SimpleQueue::handleAsyncDequeueResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ sq->dequeueComplete(qc);
+ }
+ }
+}
+
+// private
+void
+SimpleQueue::destroyCheck(const std::string& opDescr) const {
+ if (m_destroyPending || m_destroyed) {
+ std::ostringstream oss;
+ oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
+ throw qpid::Exception(oss.str());
+ }
+}
+
+// private
+void
+SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
+ --m_asyncOpCounter;
+}
+
+// private
+void
+SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
+ --m_asyncOpCounter;
+}
+
+// private
+void
+SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ }
+ --m_asyncOpCounter;
+ m_destroyed = true;
+}
+
+// private
+void
+SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
+ }
+ --m_asyncOpCounter;
+}
+
+// private
+void
+SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
+ if (qc.get()) {
+ assert(qc->getQueue().get() == this);
+ if (qc->getTxnBuffer()) { // transactional enqueue
+ qc->getTxnBuffer()->decrOpCnt();
+ }
+ }
+ --m_asyncOpCounter;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h
new file mode 100644
index 0000000000..c2f21076cd
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleQueue.h
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleQueue.h
+ */
+
+#ifndef qpid_broker_SimpleQueue_h_
+#define qpid_broker_SimpleQueue_h_
+
+#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter
+#include "qpid/broker/AsyncStore.h" // DataSource
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/QueueHandle.h"
+#include "qpid/sys/Monitor.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class AsyncResultQueue;
+class QueueAsyncContext;
+class SimpleConsumer;
+class SimpleMessages;
+class SimpleQueuedMessage;
+class SimpleMessage;
+class SimpleTxnBuffer;
+
+class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
+ public PersistableQueue,
+ public DataSource
+{
+public:
+ SimpleQueue(const std::string& name,
+ const qpid::framing::FieldTable& args,
+ AsyncStore* store,
+ AsyncResultQueue& arq);
+ virtual ~SimpleQueue();
+
+ const QueueHandle& getHandle() const;
+ QueueHandle& getHandle();
+ AsyncStore* getStore();
+
+ void asyncCreate();
+ static void handleAsyncCreateResult(const AsyncResultHandle* const arh);
+ void asyncDestroy(const bool deleteQueue);
+ static void handleAsyncDestroyResult(const AsyncResultHandle* const arh);
+
+ // --- Methods in msg handling path from qpid::Queue ---
+ void deliver(boost::intrusive_ptr<SimpleMessage> msg);
+ bool dispatch(SimpleConsumer& sc);
+ bool enqueue(boost::shared_ptr<SimpleQueuedMessage> qm);
+ bool enqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ bool dequeue(boost::shared_ptr<SimpleQueuedMessage> qm);
+ bool dequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ void process(boost::intrusive_ptr<SimpleMessage> msg);
+ void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
+
+ // --- Interface qpid::broker::Persistable ---
+ virtual void encode(qpid::framing::Buffer& buffer) const;
+ virtual uint32_t encodedSize() const;
+ virtual uint64_t getPersistenceId() const;
+ virtual void setPersistenceId(uint64_t persistenceId) const;
+
+ // --- Interface qpid::broker::PersistableQueue ---
+ virtual void flush();
+ virtual const std::string& getName() const;
+ virtual void setExternalQueueStore(ExternalQueueStore* inst);
+
+ // --- Interface qpid::broker::DataStore ---
+ virtual uint64_t getSize();
+ virtual void write(char* target);
+
+private:
+ static TxnHandle s_nullTxnHandle; // used for non-txn operations
+
+ const std::string m_name;
+ AsyncStore* m_store;
+ AsyncResultQueue& m_resultQueue;
+ qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter!
+ mutable uint64_t m_persistenceId;
+ std::string m_persistableData;
+ QueueHandle m_queueHandle;
+ bool m_destroyPending;
+ bool m_destroyed;
+
+ // --- Members & methods in msg handling path copied from qpid::Queue ---
+ struct UsageBarrier {
+ SimpleQueue& m_parent;
+ uint32_t m_count;
+ qpid::sys::Monitor m_monitor;
+ UsageBarrier(SimpleQueue& q);
+ bool acquire();
+ void release();
+ void destroy();
+ };
+ struct ScopedUse {
+ UsageBarrier& m_barrier;
+ const bool m_acquired;
+ ScopedUse(UsageBarrier& b);
+ ~ScopedUse();
+ };
+ UsageBarrier m_barrier;
+ std::auto_ptr<SimpleMessages> m_messages;
+ void push(boost::shared_ptr<SimpleQueuedMessage> qm,
+ bool isRecovery = false);
+
+ // -- Async ops ---
+ bool asyncEnqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ static void handleAsyncEnqueueResult(const AsyncResultHandle* const arh);
+ bool asyncDequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ static void handleAsyncDequeueResult(const AsyncResultHandle* const arh);
+
+ // --- Async op counter ---
+ void destroyCheck(const std::string& opDescr) const;
+
+ // --- Async op completions (called through handleAsyncResult) ---
+ void createComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleQueue_h_
diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.cpp b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp
new file mode 100644
index 0000000000..35ac799ecc
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleQueuedMessage.cpp
+ */
+
+#include "SimpleQueuedMessage.h"
+
+#include "SimpleMessage.h"
+#include "SimpleQueue.h"
+
+namespace qpid {
+namespace broker {
+
+SimpleQueuedMessage::SimpleQueuedMessage() :
+ m_queue(0)
+{}
+
+SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg) :
+ boost::enable_shared_from_this<SimpleQueuedMessage>(),
+ m_queue(q),
+ m_msg(msg)
+{
+ if (m_queue->getStore()) {
+ m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
+ }
+}
+
+SimpleQueuedMessage::SimpleQueuedMessage(const SimpleQueuedMessage& qm) :
+ boost::enable_shared_from_this<SimpleQueuedMessage>(),
+ m_queue(qm.m_queue),
+ m_msg(qm.m_msg),
+ m_enqHandle(qm.m_enqHandle)
+{}
+
+SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueuedMessage* const qm) :
+ boost::enable_shared_from_this<SimpleQueuedMessage>(),
+ m_queue(qm->m_queue),
+ m_msg(qm->m_msg),
+ m_enqHandle(qm->m_enqHandle)
+{}
+
+SimpleQueuedMessage::~SimpleQueuedMessage() {}
+
+SimpleQueue*
+SimpleQueuedMessage::getQueue() const {
+ return m_queue;
+}
+
+boost::intrusive_ptr<SimpleMessage>
+SimpleQueuedMessage::payload() const {
+ return m_msg;
+}
+
+const EnqueueHandle&
+SimpleQueuedMessage::enqHandle() const {
+ return m_enqHandle;
+}
+
+EnqueueHandle&
+SimpleQueuedMessage::enqHandle() {
+ return m_enqHandle;
+}
+
+void
+SimpleQueuedMessage::prepareEnqueue(SimpleTxnBuffer* tb) {
+ m_queue->enqueue(tb, shared_from_this());
+}
+
+void
+SimpleQueuedMessage::commitEnqueue() {
+ m_queue->process(m_msg);
+}
+
+void
+SimpleQueuedMessage::abortEnqueue() {
+ m_queue->enqueueAborted(m_msg);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.h b/cpp/src/qpid/broker/SimpleQueuedMessage.h
new file mode 100644
index 0000000000..1172eb73f3
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleQueuedMessage.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleQueuedMessage.h
+ */
+
+#ifndef qpid_broker_SimpleQueuedMessage_h_
+#define qpid_broker_SimpleQueuedMessage_h_
+
+#include "AsyncStore.h"
+#include "EnqueueHandle.h"
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SimpleMessage;
+class SimpleQueue;
+
+class SimpleQueuedMessage : public boost::enable_shared_from_this<SimpleQueuedMessage>
+{
+public:
+ SimpleQueuedMessage();
+ SimpleQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg);
+ SimpleQueuedMessage(const SimpleQueuedMessage& qm);
+ SimpleQueuedMessage(SimpleQueuedMessage* const qm);
+ virtual ~SimpleQueuedMessage();
+ SimpleQueue* getQueue() const;
+ boost::intrusive_ptr<SimpleMessage> payload() const;
+ const EnqueueHandle& enqHandle() const;
+ EnqueueHandle& enqHandle();
+
+ // --- Transaction handling ---
+ void prepareEnqueue(qpid::broker::SimpleTxnBuffer* tb);
+ void commitEnqueue();
+ void abortEnqueue();
+
+private:
+ SimpleQueue* m_queue;
+ boost::intrusive_ptr<SimpleMessage> m_msg;
+ qpid::broker::EnqueueHandle m_enqHandle;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleQueuedMessage_h_
diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.cpp b/cpp/src/qpid/broker/SimpleTxnAccept.cpp
new file mode 100644
index 0000000000..343bbb54c7
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleTxnAccept.cpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleTxnAccept.cpp
+ */
+
+#include "SimpleTxnAccept.h"
+
+#include "SimpleDeliveryRecord.h"
+
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+SimpleTxnAccept::SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops) :
+ m_ops(ops)
+{}
+
+SimpleTxnAccept::~SimpleTxnAccept() {}
+
+// --- Interface TxnOp ---
+
+bool
+SimpleTxnAccept::prepare(SimpleTxnBuffer* tb) throw() {
+ try {
+ for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->dequeue(tb);
+ }
+ return true;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)");
+ }
+ return false;
+}
+
+void
+SimpleTxnAccept::commit() throw() {
+ try {
+ for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
+ (*i)->committed();
+ (*i)->setEnded();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)");
+ }
+}
+
+void
+SimpleTxnAccept::rollback() throw() {}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.h b/cpp/src/qpid/broker/SimpleTxnAccept.h
new file mode 100644
index 0000000000..eb6963bc88
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleTxnAccept.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleTxnAccept.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_SimpleTxnAccept_h_
+#define tests_storePerftools_asyncPerf_SimpleTxnAccept_h_
+
+#include "SimpleTxnOp.h"
+
+#include "boost/shared_ptr.hpp"
+#include <deque>
+
+namespace qpid {
+namespace broker {
+
+class SimpleDeliveryRecord;
+
+class SimpleTxnAccept: public SimpleTxnOp {
+public:
+ SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops);
+ virtual ~SimpleTxnAccept();
+
+ // --- Interface TxnOp ---
+ bool prepare(SimpleTxnBuffer* tb) throw();
+ void commit() throw();
+ void rollback() throw();
+private:
+ std::deque<boost::shared_ptr<SimpleDeliveryRecord> > m_ops;
+};
+
+}} // namespace qpid::broker
+
+#endif // tests_storePerftools_asyncPerf_SimpleTxnAccept_h_
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
index 4d6e7b7918..d72a785c2a 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
@@ -18,14 +18,14 @@
*/
/**
- * \file TxnBuffer.cpp
+ * \file SimpleTxnBuffer.cpp
*/
-#include "TxnBuffer.h"
+#include "SimpleTxnBuffer.h"
#include "AsyncResultHandle.h"
+#include "SimpleTxnOp.h"
#include "TxnAsyncContext.h"
-#include "TxnOp.h"
#include "qpid/log/Statement.h"
@@ -34,9 +34,9 @@
namespace qpid {
namespace broker {
-qpid::sys::Mutex TxnBuffer::s_uuidMutex;
+qpid::sys::Mutex SimpleTxnBuffer::s_uuidMutex;
-TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
+SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq) :
m_store(0),
m_resultQueue(arq),
m_tpcFlag(false),
@@ -47,7 +47,7 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
createLocalXid();
}
-TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) :
+SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid) :
m_store(0),
m_resultQueue(arq),
m_xid(xid),
@@ -61,31 +61,31 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) :
}
}
-TxnBuffer::~TxnBuffer() {}
+SimpleTxnBuffer::~SimpleTxnBuffer() {}
TxnHandle&
-TxnBuffer::getTxnHandle() {
+SimpleTxnBuffer::getTxnHandle() {
return m_txnHandle;
}
const std::string&
-TxnBuffer::getXid() const {
+SimpleTxnBuffer::getXid() const {
return m_xid;
}
bool
-TxnBuffer::is2pc() const {
+SimpleTxnBuffer::is2pc() const {
return m_tpcFlag;
}
void
-TxnBuffer::incrOpCnt() {
+SimpleTxnBuffer::incrOpCnt() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_submitOpCntMutex);
++m_submitOpCnt;
}
void
-TxnBuffer::decrOpCnt() {
+SimpleTxnBuffer::decrOpCnt() {
const uint32_t numOps = getNumOps();
qpid::sys::ScopedLock<qpid::sys::Mutex> l2(m_completeOpCntMutex);
qpid::sys::ScopedLock<qpid::sys::Mutex> l3(m_submitOpCntMutex);
@@ -99,15 +99,15 @@ TxnBuffer::decrOpCnt() {
}
void
-TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) {
+SimpleTxnBuffer::enlist(boost::shared_ptr<SimpleTxnOp> op) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
m_ops.push_back(op);
}
bool
-TxnBuffer::prepare() {
+SimpleTxnBuffer::prepare() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
- for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
if (!(*i)->prepare(this)) {
return false;
}
@@ -116,25 +116,25 @@ TxnBuffer::prepare() {
}
void
-TxnBuffer::commit() {
+SimpleTxnBuffer::commit() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
- for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->commit();
}
m_ops.clear();
}
void
-TxnBuffer::rollback() {
+SimpleTxnBuffer::rollback() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
- for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->rollback();
}
m_ops.clear();
}
bool
-TxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
+SimpleTxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
try {
m_store = store;
asyncLocalCommit();
@@ -147,7 +147,7 @@ TxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
}
void
-TxnBuffer::asyncLocalCommit() {
+SimpleTxnBuffer::asyncLocalCommit() {
switch(m_state) {
case NONE:
m_state = PREPARE;
@@ -180,7 +180,7 @@ TxnBuffer::asyncLocalCommit() {
//static
void
-TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
+SimpleTxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
@@ -194,7 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
}
void
-TxnBuffer::asyncLocalAbort() {
+SimpleTxnBuffer::asyncLocalAbort() {
assert(m_store != 0);
switch (m_state) {
case NONE:
@@ -218,7 +218,7 @@ TxnBuffer::asyncLocalAbort() {
//static
void
-TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
+SimpleTxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
@@ -231,14 +231,14 @@ TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
// private
uint32_t
-TxnBuffer::getNumOps() const {
+SimpleTxnBuffer::getNumOps() const {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
return m_ops.size();
}
// private
void
-TxnBuffer::createLocalXid()
+SimpleTxnBuffer::createLocalXid()
{
uuid_t uuid;
{
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/SimpleTxnBuffer.h
index 02569f6545..b2164cfeed 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/SimpleTxnBuffer.h
@@ -18,11 +18,11 @@
*/
/**
- * \file TxnBuffer.h
+ * \file SimpleTxnBuffer.h
*/
-#ifndef qpid_broker_TxnBuffer_h_
-#define qpid_broker_TxnBuffer_h_
+#ifndef qpid_broker_SimpleTxnBuffer_h_
+#define qpid_broker_SimpleTxnBuffer_h_
#include "TxnHandle.h"
@@ -37,20 +37,20 @@ namespace broker {
class AsyncResultHandle;
class AsyncResultQueue;
class AsyncTransactionalStore;
-class TxnOp;
+class SimpleTxnOp;
-class TxnBuffer {
+class SimpleTxnBuffer {
public:
- TxnBuffer(AsyncResultQueue& arq);
- TxnBuffer(AsyncResultQueue& arq, std::string& xid);
- virtual ~TxnBuffer();
+ SimpleTxnBuffer(AsyncResultQueue& arq);
+ SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid);
+ virtual ~SimpleTxnBuffer();
TxnHandle& getTxnHandle();
const std::string& getXid() const;
bool is2pc() const;
void incrOpCnt();
void decrOpCnt();
- void enlist(boost::shared_ptr<TxnOp> op);
+ void enlist(boost::shared_ptr<SimpleTxnOp> op);
bool prepare();
void commit();
void rollback();
@@ -68,7 +68,7 @@ private:
mutable qpid::sys::Mutex m_completeOpCntMutex;
static qpid::sys::Mutex s_uuidMutex;
- std::vector<boost::shared_ptr<TxnOp> > m_ops;
+ std::vector<boost::shared_ptr<SimpleTxnOp> > m_ops;
TxnHandle m_txnHandle;
AsyncTransactionalStore* m_store;
AsyncResultQueue& m_resultQueue;
@@ -86,4 +86,4 @@ private:
}} // namespace qpid::broker
-#endif // qpid_broker_TxnBuffer_h_
+#endif // qpid_broker_SimpleTxnBuffer_h_
diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/SimpleTxnOp.h
index bcff87551c..2cec2da8f0 100644
--- a/cpp/src/qpid/broker/TxnOp.h
+++ b/cpp/src/qpid/broker/SimpleTxnOp.h
@@ -18,27 +18,27 @@
*/
/**
- * \file TxnOp.h
+ * \file SimpleTxnOp.h
*/
-#ifndef qpid_broker_TxnOp_h_
-#define qpid_broker_TxnOp_h_
+#ifndef qpid_broker_SimpleTxnOp_h_
+#define qpid_broker_SimpleTxnOp_h_
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
-class TxnBuffer;
+class SimpleTxnBuffer;
-class TxnOp{
+class SimpleTxnOp{
public:
- virtual ~TxnOp() {}
- virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0;
+ virtual ~SimpleTxnOp() {}
+ virtual bool prepare(SimpleTxnBuffer*) throw() = 0;
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
};
}} // namespace qpid::broker
-#endif // qpid_broker_TxnOp_h_
+#endif // qpid_broker_SimpleTxnOp_h_
diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.cpp b/cpp/src/qpid/broker/SimpleTxnPublish.cpp
new file mode 100644
index 0000000000..6ad6a108ea
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleTxnPublish.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleTxnPublish.cpp
+ */
+
+#include "SimpleTxnPublish.h"
+
+#include "SimpleMessage.h"
+#include "SimpleQueue.h"
+#include "SimpleQueuedMessage.h"
+
+#include "qpid/log/Statement.h"
+#include <boost/make_shared.hpp>
+
+namespace qpid {
+namespace broker {
+
+SimpleTxnPublish::SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
+ m_msg(msg)
+{}
+
+SimpleTxnPublish::~SimpleTxnPublish() {}
+
+bool
+SimpleTxnPublish::prepare(SimpleTxnBuffer* tb) throw() {
+ try {
+ while (!m_queues.empty()) {
+ m_queues.front()->prepareEnqueue(tb);
+ m_prepared.push_back(m_queues.front());
+ m_queues.pop_front();
+ }
+ return true;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)");
+ }
+ return false;
+}
+
+void
+SimpleTxnPublish::commit() throw() {
+ try {
+ for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
+ (*i)->commitEnqueue();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)");
+ }
+}
+
+void
+SimpleTxnPublish::rollback() throw() {
+ try {
+ for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
+ (*i)->abortEnqueue();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)");
+ }
+}
+
+uint64_t
+SimpleTxnPublish::contentSize() {
+ return m_msg->contentSize();
+}
+
+void
+SimpleTxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
+ m_queues.push_back(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(queue.get(), m_msg)));
+ m_delivered = true;
+}
+
+SimpleMessage&
+SimpleTxnPublish::getMessage() {
+ return *m_msg;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.h b/cpp/src/qpid/broker/SimpleTxnPublish.h
new file mode 100644
index 0000000000..0aaf8e4ba0
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleTxnPublish.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleTxnPublish.h
+ */
+
+#ifndef qpid_broker_SimpleTxnPublish_h_
+#define qpid_broker_SimpleTxnPublish_h_
+
+#include "SimpleDeliverable.h"
+#include "SimpleTxnOp.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <list>
+
+
+namespace qpid {
+namespace broker {
+
+class SimpleQueuedMessage;
+class SimpleMessage;
+class SimpleQueue;
+
+class SimpleTxnPublish : public SimpleTxnOp,
+ public SimpleDeliverable
+{
+public:
+ SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg);
+ virtual ~SimpleTxnPublish();
+
+ // --- Interface TxOp ---
+ bool prepare(SimpleTxnBuffer* tb) throw();
+ void commit() throw();
+ void rollback() throw();
+
+ // --- Interface Deliverable ---
+ uint64_t contentSize();
+ void deliverTo(const boost::shared_ptr<SimpleQueue>& queue);
+ SimpleMessage& getMessage();
+
+private:
+ boost::intrusive_ptr<SimpleMessage> m_msg;
+ std::list<boost::shared_ptr<SimpleQueuedMessage> > m_queues;
+ std::list<boost::shared_ptr<SimpleQueuedMessage> > m_prepared;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleTxnPublish_h_
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
index 63e2de2b41..527cb4741f 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.cpp
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -26,7 +26,7 @@
namespace qpid {
namespace broker {
-TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
+TxnAsyncContext::TxnAsyncContext(SimpleTxnBuffer* const tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq):
m_tb(tb),
@@ -37,7 +37,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
TxnAsyncContext::~TxnAsyncContext()
{}
-TxnBuffer*
+SimpleTxnBuffer*
TxnAsyncContext::getTxnBuffer() const
{
return m_tb;
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
index 9c617238e8..04f6ef76f5 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.h
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -29,38 +29,34 @@
#include "qpid/asyncStore/AsyncOperation.h"
namespace qpid {
-//namespace asyncStore {
-//class AsyncOperation;
-//}
namespace broker {
class AsyncResultHandle;
class AsyncResultQueue;
-//class TxnHandle;
typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
class TxnAsyncContext: public BrokerAsyncContext {
public:
- TxnAsyncContext(TxnBuffer* const tb,
+ TxnAsyncContext(SimpleTxnBuffer* const tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
virtual ~TxnAsyncContext();
- TxnBuffer* getTxnBuffer() const;
+ SimpleTxnBuffer* getTxnBuffer() const;
// --- Interface BrokerAsyncContext ---
AsyncResultQueue* getAsyncResultQueue() const;
void invokeCallback(const AsyncResultHandle* const) const;
private:
- TxnBuffer* const m_tb;
+ SimpleTxnBuffer* const m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
class TpcTxnAsyncContext : public TxnAsyncContext {
public:
- TpcTxnAsyncContext(TxnBuffer* const tb,
+ TpcTxnAsyncContext(SimpleTxnBuffer* const tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
TxnAsyncContext(tb, rcb, arq)