diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-01 14:05:21 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-01 14:05:21 +0000 |
| commit | 80bfab9ed823cebd9f8f58b559fd32df108bcf7d (patch) | |
| tree | 191bf724b9bf5b8394343d60ac4eac804e9c3d3a /cpp/src/qpid/broker | |
| parent | 63c6598f401ac6406e5a31c602c7892b798536fc (diff) | |
| download | qpid-python-80bfab9ed823cebd9f8f58b559fd32df108bcf7d.tar.gz | |
QPID-3858: WIP: Moving Simple* test classes into the correct namespaces so as to correspond with broker classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368006 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
28 files changed, 1871 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 6f1c02e059..7009565a7c 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -70,19 +70,19 @@ class TxnHandle; class QueueAsyncContext; class TpcTxnAsyncContext; class TxnAsyncContext; -class TxnBuffer; +class SimpleTxnBuffer; class AsyncTransactionalStore { public: virtual ~AsyncTransactionalStore() {} virtual TxnHandle createTxnHandle() = 0; - virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; + virtual TxnHandle createTxnHandle(SimpleTxnBuffer* tb) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - TxnBuffer* tb) = 0; + SimpleTxnBuffer* tb) = 0; virtual void submitPrepare(TxnHandle&, boost::shared_ptr<TpcTxnAsyncContext>) = 0; // Distributed txns only @@ -94,7 +94,7 @@ public: }; // Subclassed by store: -class AsyncStore { +class AsyncStore : public AsyncTransactionalStore { public: virtual ~AsyncStore() {} diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp index 4bd2d271eb..02eb2e9546 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.cpp +++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp @@ -48,7 +48,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, {} QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -61,7 +61,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -89,7 +89,7 @@ QueueAsyncContext::getMessage() const return m_msg; } -TxnBuffer* +SimpleTxnBuffer* QueueAsyncContext::getTxnBuffer() const { return m_tb; } diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index e9ba2ebbac..8657922377 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -52,18 +52,18 @@ public: AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); boost::shared_ptr<PersistableQueue> getQueue() const; boost::intrusive_ptr<PersistableMessage> getMessage() const; - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; AsyncResultQueue* getAsyncResultQueue() const; AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const AsyncResultHandle* const arh) const; @@ -72,7 +72,7 @@ public: private: boost::shared_ptr<PersistableQueue> m_q; boost::intrusive_ptr<PersistableMessage> m_msg; - TxnBuffer* m_tb; + SimpleTxnBuffer* m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/SimpleConsumer.h b/cpp/src/qpid/broker/SimpleConsumer.h new file mode 100644 index 0000000000..6601c65a42 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleConsumer.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleConsumer.h + */ + +#ifndef qpid_broker_SimpleConsumer_h_ +#define qpid_broker_SimpleConsumer_h_ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +class SimpleDeliveryRecord; + +class SimpleConsumer { +public: + virtual ~SimpleConsumer() {} + virtual void commitComplete() = 0; + virtual void record(boost::shared_ptr<SimpleDeliveryRecord> dr) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleConsumer_h_ diff --git a/cpp/src/qpid/broker/SimpleDeliverable.cpp b/cpp/src/qpid/broker/SimpleDeliverable.cpp new file mode 100644 index 0000000000..7037a377c5 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliverable.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliverable.cpp + */ + +#include "SimpleDeliverable.h" + +namespace qpid { +namespace broker { + +SimpleDeliverable::SimpleDeliverable() : + m_delivered(false) +{} + +SimpleDeliverable::~SimpleDeliverable() {} + +bool +SimpleDeliverable::isDelivered() const { + return m_delivered; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleDeliverable.h b/cpp/src/qpid/broker/SimpleDeliverable.h new file mode 100644 index 0000000000..6441e14841 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliverable.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliverable.h + */ + +#ifndef qpid_broker_SimpleDeliverable_h_ +#define qpid_broker_SimpleDeliverable_h_ + +#include <boost/shared_ptr.hpp> +#include <stdint.h> // uint64_t + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleDeliverable +{ +public: + SimpleDeliverable(); + virtual ~SimpleDeliverable(); + + virtual uint64_t contentSize() = 0; + virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0; + virtual SimpleMessage& getMessage() = 0; + virtual bool isDelivered() const; + +protected: + bool m_delivered; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleDeliverable_h_ diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp new file mode 100644 index 0000000000..b71df6975b --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliveryRecord.cpp + */ + +#include "SimpleDeliveryRecord.h" + +#include "SimpleConsumer.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" + +namespace qpid { +namespace broker { + +SimpleDeliveryRecord::SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm, + SimpleConsumer& sc, + bool accepted) : + m_queuedMessage(qm), + m_msgConsumer(sc), + m_accepted(accepted), + m_ended(accepted) +{} + +SimpleDeliveryRecord::~SimpleDeliveryRecord() {} + +bool +SimpleDeliveryRecord::accept() { + if (!m_ended) { + m_queuedMessage->getQueue()->dequeue(m_queuedMessage); + m_accepted = true; + setEnded(); + } + return isRedundant(); +} + +bool +SimpleDeliveryRecord::isAccepted() const { + return m_accepted; +} + +bool +SimpleDeliveryRecord::setEnded() { + m_ended = true; + m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0); + return isRedundant(); +} + +bool +SimpleDeliveryRecord::isEnded() const { + return m_ended; +} + +bool +SimpleDeliveryRecord::isRedundant() const { + return m_ended; +} + +void +SimpleDeliveryRecord::dequeue(qpid::broker::SimpleTxnBuffer* tb) { + m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); +} + +void +SimpleDeliveryRecord::committed() const { + m_msgConsumer.commitComplete(); +} + +boost::shared_ptr<SimpleQueuedMessage> +SimpleDeliveryRecord::getQueuedMessage() const { + return m_queuedMessage; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.h b/cpp/src/qpid/broker/SimpleDeliveryRecord.h new file mode 100644 index 0000000000..622ce578d7 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliveryRecord.h + */ + +#ifndef qpid_broker_SimpleDeliveryRecord_h_ +#define qpid_broker_SimpleDeliveryRecord_h_ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class SimpleConsumer; +class SimpleQueuedMessage; +class SimpleTxnBuffer; + +class SimpleDeliveryRecord { +public: + SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm, + SimpleConsumer& sc, + bool accepted); + virtual ~SimpleDeliveryRecord(); + bool accept(); + bool isAccepted() const; + bool setEnded(); + bool isEnded() const; + bool isRedundant() const; + void dequeue(qpid::broker::SimpleTxnBuffer* tb); + void committed() const; + boost::shared_ptr<SimpleQueuedMessage> getQueuedMessage() const; +private: + boost::shared_ptr<SimpleQueuedMessage> m_queuedMessage; + SimpleConsumer& m_msgConsumer; + bool m_accepted : 1; + bool m_ended : 1; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleDeliveryRecord_h_ diff --git a/cpp/src/qpid/broker/SimpleMessage.cpp b/cpp/src/qpid/broker/SimpleMessage.cpp new file mode 100644 index 0000000000..1239533edf --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessage.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.cpp + */ + +#include "SimpleMessage.h" + +#include <string.h> // memcpy() + +namespace qpid { +namespace broker { + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast<size_t>(msgSize)), + m_store(0), + m_msgHandle(MessageHandle()) +{} + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize, + AsyncStore* store) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast<size_t>(msgSize)), + m_store(store), + m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle()) +{} + +SimpleMessage::~SimpleMessage() {} + +const MessageHandle& +SimpleMessage::getHandle() const { + return m_msgHandle; +} + +MessageHandle& +SimpleMessage::getHandle() { + return m_msgHandle; +} + +uint64_t +SimpleMessage::contentSize() const { + return static_cast<uint64_t>(m_msg.size()); +} + +void +SimpleMessage::setPersistenceId(uint64_t id) const { + m_persistenceId = id; +} + +uint64_t +SimpleMessage::getPersistenceId() const { + return m_persistenceId; +} + +void +SimpleMessage::encode(qpid::framing::Buffer& buffer) const { + buffer.putRawData(m_msg); +} + +uint32_t +SimpleMessage::encodedSize() const { + return static_cast<uint32_t>(m_msg.size()); +} + +void +SimpleMessage::allDequeuesComplete() {} + +uint32_t +SimpleMessage::encodedHeaderSize() const { + return 0; +} + +bool +SimpleMessage::isPersistent() const { + return m_store != 0; +} + +uint64_t +SimpleMessage::getSize() { + return m_msg.size(); +} + +void +SimpleMessage::write(char* target) { + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessage.h b/cpp/src/qpid/broker/SimpleMessage.h new file mode 100644 index 0000000000..edfaa8d13b --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessage.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.h + */ + +#ifndef qpid_broker_SimpleMessage_h_ +#define qpid_broker_SimpleMessage_h_ + +#include "AsyncStore.h" // DataSource +#include "MessageHandle.h" +#include "PersistableMessage.h" + +namespace qpid { +namespace broker { + +class SimpleMessage: public PersistableMessage, + public DataSource +{ +public: + SimpleMessage(const char* msgData, + const uint32_t msgSize); + SimpleMessage(const char* msgData, + const uint32_t msgSize, + AsyncStore* store); + virtual ~SimpleMessage(); + const MessageHandle& getHandle() const; + MessageHandle& getHandle(); + uint64_t contentSize() const; + + // --- Interface Persistable --- + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // --- Interface PersistableMessage --- + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // --- Interface DataSource --- + virtual uint64_t getSize(); // <- same as encodedSize()? + virtual void write(char* target); + +private: + mutable uint64_t m_persistenceId; + const std::string m_msg; + AsyncStore* m_store; + + MessageHandle m_msgHandle; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessage_h_ diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp new file mode 100644 index 0000000000..a88258f5bc --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageAsyncContext.cpp + */ + +#include "SimpleMessageAsyncContext.h" + +#include "SimpleMessage.h" + +#include <cassert> + +namespace qpid { +namespace broker { + +SimpleMessageAsyncContext::SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, + boost::shared_ptr<SimpleQueue> q) : + m_msg(msg), + m_q(q) +{ + assert(m_msg.get() != 0); + assert(m_q.get() != 0); +} + +SimpleMessageAsyncContext::~SimpleMessageAsyncContext() {} + +boost::intrusive_ptr<SimpleMessage> +SimpleMessageAsyncContext::getMessage() const { + return m_msg; +} + +boost::shared_ptr<SimpleQueue> +SimpleMessageAsyncContext::getQueue() const { + return m_q; +} + +void +SimpleMessageAsyncContext::destroy() { + delete this; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.h b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h new file mode 100644 index 0000000000..e3975e790e --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageAsyncContext.h + */ + +#ifndef qpid_broker_SimpleMessageAsyncContext_h_ +#define qpid_broker_SimpleMessageAsyncContext_h_ + +#include "AsyncStore.h" // BrokerAsyncContext + +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleMessageAsyncContext : public BrokerAsyncContext +{ +public: + SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, + boost::shared_ptr<SimpleQueue> q); + virtual ~SimpleMessageAsyncContext(); + boost::intrusive_ptr<SimpleMessage> getMessage() const; + boost::shared_ptr<SimpleQueue> getQueue() const; + void destroy(); + +private: + boost::intrusive_ptr<SimpleMessage> m_msg; + boost::shared_ptr<SimpleQueue> m_q; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessageAsyncContext_h_ diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.cpp b/cpp/src/qpid/broker/SimpleMessageDeque.cpp new file mode 100644 index 0000000000..0aadcfd94a --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageDeque.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageDeque.cpp + */ + +#include "SimpleMessageDeque.h" + +#include "SimpleQueuedMessage.h" + +namespace qpid { +namespace broker { + +SimpleMessageDeque::SimpleMessageDeque() {} + +SimpleMessageDeque::~SimpleMessageDeque() {} + +uint32_t +SimpleMessageDeque::size() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); + return m_messages.size(); +} + +bool +SimpleMessageDeque::push(boost::shared_ptr<SimpleQueuedMessage>& added) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); + m_messages.push_back(added); + return false; +} + +bool +SimpleMessageDeque::consume(boost::shared_ptr<SimpleQueuedMessage>& msg) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); + if (!m_messages.empty()) { + msg = m_messages.front(); + m_messages.pop_front(); + return true; + } + return false; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.h b/cpp/src/qpid/broker/SimpleMessageDeque.h new file mode 100644 index 0000000000..5db0755a43 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageDeque.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageDeque.h + */ + +/* + * This is a copy of qpid::broker::MessageDeque.h, but using the local + * SimpleQueuedMessage class instead of QueuedMessage. + */ + +#ifndef qpid_broker_SimpleMessageDeque_h_ +#define qpid_broker_SimpleMessageDeque_h_ + +#include "SimpleMessages.h" + +#include "qpid/sys/Mutex.h" + +#include <deque> + +namespace qpid { +namespace broker { + +class SimpleMessageDeque : public SimpleMessages +{ +public: + SimpleMessageDeque(); + virtual ~SimpleMessageDeque(); + uint32_t size(); + bool push(boost::shared_ptr<SimpleQueuedMessage>& added); + bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg); +private: + std::deque<boost::shared_ptr<SimpleQueuedMessage> > m_messages; + qpid::sys::Mutex m_msgMutex; + +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessageDeque_h_ diff --git a/cpp/src/qpid/broker/SimpleMessages.h b/cpp/src/qpid/broker/SimpleMessages.h new file mode 100644 index 0000000000..2a40859032 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessages.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessages.h + */ + +/* + * This is a copy of qpid::broker::Messages.h, but using the local + * tests::storePerftools::asyncPerf::QueuedMessage class instead of + * qpid::broker::QueuedMessage. + */ + +#ifndef qpid_broker_SimpleMessages_h_ +#define qpid_broker_SimpleMessages_h_ + +#include <boost/shared_ptr.hpp> +#include <stdint.h> + +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; + +class SimpleMessages +{ +public: + virtual ~SimpleMessages() {} + virtual uint32_t size() = 0; + virtual bool push(boost::shared_ptr<SimpleQueuedMessage>& added) = 0; + virtual bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessages_h_ diff --git a/cpp/src/qpid/broker/SimpleQueue.cpp b/cpp/src/qpid/broker/SimpleQueue.cpp new file mode 100644 index 0000000000..5cd8841f94 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueue.cpp @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.cpp + */ + +#include "SimpleQueue.h" + +#include "AsyncResultHandle.h" +#include "QueueAsyncContext.h" +#include "SimpleConsumer.h" +#include "SimpleDeliveryRecord.h" +#include "SimpleMessage.h" +#include "SimpleMessageDeque.h" +#include "SimpleQueuedMessage.h" +#include "SimpleTxnBuffer.h" + +#include <string.h> // memcpy() + +namespace qpid { +namespace broker { + +//static +TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations + + +SimpleQueue::SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + AsyncStore* store, + AsyncResultQueue& arq) : + PersistableQueue(), + m_name(name), + m_store(store), + m_resultQueue(arq), + m_asyncOpCounter(0UL), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new SimpleMessageDeque()) +{ + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } +} + +SimpleQueue::~SimpleQueue() {} + +const QueueHandle& +SimpleQueue::getHandle() const { + return m_queueHandle; +} + +QueueHandle& +SimpleQueue::getHandle() { + return m_queueHandle; +} + +AsyncStore* +SimpleQueue::getStore() { + return m_store; +} + +void +SimpleQueue::asyncCreate() { + if (m_store) { + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncCreateResult, + &m_resultQueue)); + m_store->submitCreate(m_queueHandle, this, qac); + ++m_asyncOpCounter; + } +} + +//static +void +SimpleQueue::handleAsyncCreateResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->createComplete(qc); + } + } +} + +void +SimpleQueue::asyncDestroy(const bool deleteQueue) +{ + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncDestroyResult, + &m_resultQueue)); + m_store->submitDestroy(m_queueHandle, qac); + ++m_asyncOpCounter; + } + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); + } +} + +//static +void +SimpleQueue::handleAsyncDestroyResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = + boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->destroyComplete(qc); + } + } +} + +void +SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { + boost::shared_ptr<SimpleQueuedMessage> qm(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg))); + enqueue(qm); + push(qm); +} + +bool +SimpleQueue::dispatch(SimpleConsumer& sc) { + boost::shared_ptr<SimpleQueuedMessage> qm; + if (m_messages->consume(qm)) { + boost::shared_ptr<SimpleDeliveryRecord> dr(new SimpleDeliveryRecord(qm, sc, false)); + sc.record(dr); + return true; + } + return false; +} + +bool +SimpleQueue::enqueue(boost::shared_ptr<SimpleQueuedMessage> qm) { + return enqueue(0, qm); +} + +bool +SimpleQueue::enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(tb, qm); + } + return false; +} + +bool +SimpleQueue::dequeue(boost::shared_ptr<SimpleQueuedMessage> qm) { + return dequeue(0, qm); +} + +bool +SimpleQueue::dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(tb, qm); + } + return true; +} + +void +SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { + push(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg))); +} + +void +SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {} + +void +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { + buffer.putShortString(m_name); +} + +uint32_t +SimpleQueue::encodedSize() const { + return m_name.size() + 1; +} + +uint64_t +SimpleQueue::getPersistenceId() const { + return m_persistenceId; +} + +void +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { + m_persistenceId = persistenceId; +} + +void +SimpleQueue::flush() { + //if(m_store) m_store->flush(*this); +} + +const std::string& +SimpleQueue::getName() const { + return m_name; +} + +void +SimpleQueue::setExternalQueueStore(ExternalQueueStore* inst) { + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +SimpleQueue::getSize() { + return m_persistableData.size(); +} + +void +SimpleQueue::write(char* target) { + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +SimpleQueue::UsageBarrier::acquire() { + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void SimpleQueue::UsageBarrier::release() { + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void SimpleQueue::UsageBarrier::destroy() { + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +SimpleQueue::ScopedUse::~ScopedUse() { + if (m_acquired) { + m_barrier.release(); + } +} + +// private +void +SimpleQueue::push(boost::shared_ptr<SimpleQueuedMessage> qm, + bool /*isRecovery*/) { + m_messages->push(qm); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// private +bool +SimpleQueue::asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { + assert(qm.get()); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncEnqueueResult, + &m_resultQueue)); + if (tb) { + tb->incrOpCnt(); + m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); + } + ++m_asyncOpCounter; + return true; +} + +// private static +void +SimpleQueue::handleAsyncEnqueueResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = + boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->enqueueComplete(qc); + } + } +} + +// private +bool +SimpleQueue::asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { + assert(qm.get()); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncDequeueResult, + &m_resultQueue)); + if (tb) { + tb->incrOpCnt(); + m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); + } + ++m_asyncOpCounter; + return true; +} + +// private static +void +SimpleQueue::handleAsyncDequeueResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->dequeueComplete(qc); + } + } +} + +// private +void +SimpleQueue::destroyCheck(const std::string& opDescr) const { + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + +// private +void +SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; + m_destroyed = true; +} + +// private +void +SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } + } + --m_asyncOpCounter; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h new file mode 100644 index 0000000000..c2f21076cd --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueue.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.h + */ + +#ifndef qpid_broker_SimpleQueue_h_ +#define qpid_broker_SimpleQueue_h_ + +#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter +#include "qpid/broker/AsyncStore.h" // DataSource +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Monitor.h" + +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { + +class AsyncResultQueue; +class QueueAsyncContext; +class SimpleConsumer; +class SimpleMessages; +class SimpleQueuedMessage; +class SimpleMessage; +class SimpleTxnBuffer; + +class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, + public PersistableQueue, + public DataSource +{ +public: + SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& args, + AsyncStore* store, + AsyncResultQueue& arq); + virtual ~SimpleQueue(); + + const QueueHandle& getHandle() const; + QueueHandle& getHandle(); + AsyncStore* getStore(); + + void asyncCreate(); + static void handleAsyncCreateResult(const AsyncResultHandle* const arh); + void asyncDestroy(const bool deleteQueue); + static void handleAsyncDestroyResult(const AsyncResultHandle* const arh); + + // --- Methods in msg handling path from qpid::Queue --- + void deliver(boost::intrusive_ptr<SimpleMessage> msg); + bool dispatch(SimpleConsumer& sc); + bool enqueue(boost::shared_ptr<SimpleQueuedMessage> qm); + bool enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + bool dequeue(boost::shared_ptr<SimpleQueuedMessage> qm); + bool dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + void process(boost::intrusive_ptr<SimpleMessage> msg); + void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(ExternalQueueStore* inst); + + // --- Interface qpid::broker::DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +private: + static TxnHandle s_nullTxnHandle; // used for non-txn operations + + const std::string m_name; + AsyncStore* m_store; + AsyncResultQueue& m_resultQueue; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! + mutable uint64_t m_persistenceId; + std::string m_persistableData; + QueueHandle m_queueHandle; + bool m_destroyPending; + bool m_destroyed; + + // --- Members & methods in msg handling path copied from qpid::Queue --- + struct UsageBarrier { + SimpleQueue& m_parent; + uint32_t m_count; + qpid::sys::Monitor m_monitor; + UsageBarrier(SimpleQueue& q); + bool acquire(); + void release(); + void destroy(); + }; + struct ScopedUse { + UsageBarrier& m_barrier; + const bool m_acquired; + ScopedUse(UsageBarrier& b); + ~ScopedUse(); + }; + UsageBarrier m_barrier; + std::auto_ptr<SimpleMessages> m_messages; + void push(boost::shared_ptr<SimpleQueuedMessage> qm, + bool isRecovery = false); + + // -- Async ops --- + bool asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + static void handleAsyncEnqueueResult(const AsyncResultHandle* const arh); + bool asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + static void handleAsyncDequeueResult(const AsyncResultHandle* const arh); + + // --- Async op counter --- + void destroyCheck(const std::string& opDescr) const; + + // --- Async op completions (called through handleAsyncResult) --- + void createComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc); +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleQueue_h_ diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.cpp b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp new file mode 100644 index 0000000000..35ac799ecc --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueuedMessage.cpp + */ + +#include "SimpleQueuedMessage.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" + +namespace qpid { +namespace broker { + +SimpleQueuedMessage::SimpleQueuedMessage() : + m_queue(0) +{} + +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg) : + boost::enable_shared_from_this<SimpleQueuedMessage>(), + m_queue(q), + m_msg(msg) +{ + if (m_queue->getStore()) { + m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()); + } +} + +SimpleQueuedMessage::SimpleQueuedMessage(const SimpleQueuedMessage& qm) : + boost::enable_shared_from_this<SimpleQueuedMessage>(), + m_queue(qm.m_queue), + m_msg(qm.m_msg), + m_enqHandle(qm.m_enqHandle) +{} + +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueuedMessage* const qm) : + boost::enable_shared_from_this<SimpleQueuedMessage>(), + m_queue(qm->m_queue), + m_msg(qm->m_msg), + m_enqHandle(qm->m_enqHandle) +{} + +SimpleQueuedMessage::~SimpleQueuedMessage() {} + +SimpleQueue* +SimpleQueuedMessage::getQueue() const { + return m_queue; +} + +boost::intrusive_ptr<SimpleMessage> +SimpleQueuedMessage::payload() const { + return m_msg; +} + +const EnqueueHandle& +SimpleQueuedMessage::enqHandle() const { + return m_enqHandle; +} + +EnqueueHandle& +SimpleQueuedMessage::enqHandle() { + return m_enqHandle; +} + +void +SimpleQueuedMessage::prepareEnqueue(SimpleTxnBuffer* tb) { + m_queue->enqueue(tb, shared_from_this()); +} + +void +SimpleQueuedMessage::commitEnqueue() { + m_queue->process(m_msg); +} + +void +SimpleQueuedMessage::abortEnqueue() { + m_queue->enqueueAborted(m_msg); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.h b/cpp/src/qpid/broker/SimpleQueuedMessage.h new file mode 100644 index 0000000000..1172eb73f3 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueuedMessage.h + */ + +#ifndef qpid_broker_SimpleQueuedMessage_h_ +#define qpid_broker_SimpleQueuedMessage_h_ + +#include "AsyncStore.h" +#include "EnqueueHandle.h" + +#include <boost/enable_shared_from_this.hpp> +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleQueuedMessage : public boost::enable_shared_from_this<SimpleQueuedMessage> +{ +public: + SimpleQueuedMessage(); + SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg); + SimpleQueuedMessage(const SimpleQueuedMessage& qm); + SimpleQueuedMessage(SimpleQueuedMessage* const qm); + virtual ~SimpleQueuedMessage(); + SimpleQueue* getQueue() const; + boost::intrusive_ptr<SimpleMessage> payload() const; + const EnqueueHandle& enqHandle() const; + EnqueueHandle& enqHandle(); + + // --- Transaction handling --- + void prepareEnqueue(qpid::broker::SimpleTxnBuffer* tb); + void commitEnqueue(); + void abortEnqueue(); + +private: + SimpleQueue* m_queue; + boost::intrusive_ptr<SimpleMessage> m_msg; + qpid::broker::EnqueueHandle m_enqHandle; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleQueuedMessage_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.cpp b/cpp/src/qpid/broker/SimpleTxnAccept.cpp new file mode 100644 index 0000000000..343bbb54c7 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnAccept.cpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnAccept.cpp + */ + +#include "SimpleTxnAccept.h" + +#include "SimpleDeliveryRecord.h" + +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +SimpleTxnAccept::SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops) : + m_ops(ops) +{} + +SimpleTxnAccept::~SimpleTxnAccept() {} + +// --- Interface TxnOp --- + +bool +SimpleTxnAccept::prepare(SimpleTxnBuffer* tb) throw() { + try { + for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->dequeue(tb); + } + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); + } + return false; +} + +void +SimpleTxnAccept::commit() throw() { + try { + for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { + (*i)->committed(); + (*i)->setEnded(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); + } catch(...) { + QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); + } +} + +void +SimpleTxnAccept::rollback() throw() {} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.h b/cpp/src/qpid/broker/SimpleTxnAccept.h new file mode 100644 index 0000000000..eb6963bc88 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnAccept.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnAccept.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ +#define tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ + +#include "SimpleTxnOp.h" + +#include "boost/shared_ptr.hpp" +#include <deque> + +namespace qpid { +namespace broker { + +class SimpleDeliveryRecord; + +class SimpleTxnAccept: public SimpleTxnOp { +public: + SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops); + virtual ~SimpleTxnAccept(); + + // --- Interface TxnOp --- + bool prepare(SimpleTxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); +private: + std::deque<boost::shared_ptr<SimpleDeliveryRecord> > m_ops; +}; + +}} // namespace qpid::broker + +#endif // tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp index 4d6e7b7918..d72a785c2a 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp @@ -18,14 +18,14 @@ */ /** - * \file TxnBuffer.cpp + * \file SimpleTxnBuffer.cpp */ -#include "TxnBuffer.h" +#include "SimpleTxnBuffer.h" #include "AsyncResultHandle.h" +#include "SimpleTxnOp.h" #include "TxnAsyncContext.h" -#include "TxnOp.h" #include "qpid/log/Statement.h" @@ -34,9 +34,9 @@ namespace qpid { namespace broker { -qpid::sys::Mutex TxnBuffer::s_uuidMutex; +qpid::sys::Mutex SimpleTxnBuffer::s_uuidMutex; -TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq) : m_store(0), m_resultQueue(arq), m_tpcFlag(false), @@ -47,7 +47,7 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : createLocalXid(); } -TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid) : m_store(0), m_resultQueue(arq), m_xid(xid), @@ -61,31 +61,31 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : } } -TxnBuffer::~TxnBuffer() {} +SimpleTxnBuffer::~SimpleTxnBuffer() {} TxnHandle& -TxnBuffer::getTxnHandle() { +SimpleTxnBuffer::getTxnHandle() { return m_txnHandle; } const std::string& -TxnBuffer::getXid() const { +SimpleTxnBuffer::getXid() const { return m_xid; } bool -TxnBuffer::is2pc() const { +SimpleTxnBuffer::is2pc() const { return m_tpcFlag; } void -TxnBuffer::incrOpCnt() { +SimpleTxnBuffer::incrOpCnt() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_submitOpCntMutex); ++m_submitOpCnt; } void -TxnBuffer::decrOpCnt() { +SimpleTxnBuffer::decrOpCnt() { const uint32_t numOps = getNumOps(); qpid::sys::ScopedLock<qpid::sys::Mutex> l2(m_completeOpCntMutex); qpid::sys::ScopedLock<qpid::sys::Mutex> l3(m_submitOpCntMutex); @@ -99,15 +99,15 @@ TxnBuffer::decrOpCnt() { } void -TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) { +SimpleTxnBuffer::enlist(boost::shared_ptr<SimpleTxnOp> op) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); m_ops.push_back(op); } bool -TxnBuffer::prepare() { +SimpleTxnBuffer::prepare() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); - for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { if (!(*i)->prepare(this)) { return false; } @@ -116,25 +116,25 @@ TxnBuffer::prepare() { } void -TxnBuffer::commit() { +SimpleTxnBuffer::commit() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); - for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->commit(); } m_ops.clear(); } void -TxnBuffer::rollback() { +SimpleTxnBuffer::rollback() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); - for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->rollback(); } m_ops.clear(); } bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { +SimpleTxnBuffer::commitLocal(AsyncTransactionalStore* const store) { try { m_store = store; asyncLocalCommit(); @@ -147,7 +147,7 @@ TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { } void -TxnBuffer::asyncLocalCommit() { +SimpleTxnBuffer::asyncLocalCommit() { switch(m_state) { case NONE: m_state = PREPARE; @@ -180,7 +180,7 @@ TxnBuffer::asyncLocalCommit() { //static void -TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { +SimpleTxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { @@ -194,7 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { } void -TxnBuffer::asyncLocalAbort() { +SimpleTxnBuffer::asyncLocalAbort() { assert(m_store != 0); switch (m_state) { case NONE: @@ -218,7 +218,7 @@ TxnBuffer::asyncLocalAbort() { //static void -TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { +SimpleTxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { @@ -231,14 +231,14 @@ TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { // private uint32_t -TxnBuffer::getNumOps() const { +SimpleTxnBuffer::getNumOps() const { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); return m_ops.size(); } // private void -TxnBuffer::createLocalXid() +SimpleTxnBuffer::createLocalXid() { uuid_t uuid; { diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/SimpleTxnBuffer.h index 02569f6545..b2164cfeed 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.h @@ -18,11 +18,11 @@ */ /** - * \file TxnBuffer.h + * \file SimpleTxnBuffer.h */ -#ifndef qpid_broker_TxnBuffer_h_ -#define qpid_broker_TxnBuffer_h_ +#ifndef qpid_broker_SimpleTxnBuffer_h_ +#define qpid_broker_SimpleTxnBuffer_h_ #include "TxnHandle.h" @@ -37,20 +37,20 @@ namespace broker { class AsyncResultHandle; class AsyncResultQueue; class AsyncTransactionalStore; -class TxnOp; +class SimpleTxnOp; -class TxnBuffer { +class SimpleTxnBuffer { public: - TxnBuffer(AsyncResultQueue& arq); - TxnBuffer(AsyncResultQueue& arq, std::string& xid); - virtual ~TxnBuffer(); + SimpleTxnBuffer(AsyncResultQueue& arq); + SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid); + virtual ~SimpleTxnBuffer(); TxnHandle& getTxnHandle(); const std::string& getXid() const; bool is2pc() const; void incrOpCnt(); void decrOpCnt(); - void enlist(boost::shared_ptr<TxnOp> op); + void enlist(boost::shared_ptr<SimpleTxnOp> op); bool prepare(); void commit(); void rollback(); @@ -68,7 +68,7 @@ private: mutable qpid::sys::Mutex m_completeOpCntMutex; static qpid::sys::Mutex s_uuidMutex; - std::vector<boost::shared_ptr<TxnOp> > m_ops; + std::vector<boost::shared_ptr<SimpleTxnOp> > m_ops; TxnHandle m_txnHandle; AsyncTransactionalStore* m_store; AsyncResultQueue& m_resultQueue; @@ -86,4 +86,4 @@ private: }} // namespace qpid::broker -#endif // qpid_broker_TxnBuffer_h_ +#endif // qpid_broker_SimpleTxnBuffer_h_ diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/SimpleTxnOp.h index bcff87551c..2cec2da8f0 100644 --- a/cpp/src/qpid/broker/TxnOp.h +++ b/cpp/src/qpid/broker/SimpleTxnOp.h @@ -18,27 +18,27 @@ */ /** - * \file TxnOp.h + * \file SimpleTxnOp.h */ -#ifndef qpid_broker_TxnOp_h_ -#define qpid_broker_TxnOp_h_ +#ifndef qpid_broker_SimpleTxnOp_h_ +#define qpid_broker_SimpleTxnOp_h_ #include <boost/shared_ptr.hpp> namespace qpid { namespace broker { -class TxnBuffer; +class SimpleTxnBuffer; -class TxnOp{ +class SimpleTxnOp{ public: - virtual ~TxnOp() {} - virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0; + virtual ~SimpleTxnOp() {} + virtual bool prepare(SimpleTxnBuffer*) throw() = 0; virtual void commit() throw() = 0; virtual void rollback() throw() = 0; }; }} // namespace qpid::broker -#endif // qpid_broker_TxnOp_h_ +#endif // qpid_broker_SimpleTxnOp_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.cpp b/cpp/src/qpid/broker/SimpleTxnPublish.cpp new file mode 100644 index 0000000000..6ad6a108ea --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnPublish.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnPublish.cpp + */ + +#include "SimpleTxnPublish.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" + +#include "qpid/log/Statement.h" +#include <boost/make_shared.hpp> + +namespace qpid { +namespace broker { + +SimpleTxnPublish::SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : + m_msg(msg) +{} + +SimpleTxnPublish::~SimpleTxnPublish() {} + +bool +SimpleTxnPublish::prepare(SimpleTxnBuffer* tb) throw() { + try { + while (!m_queues.empty()) { + m_queues.front()->prepareEnqueue(tb); + m_prepared.push_back(m_queues.front()); + m_queues.pop_front(); + } + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); + } + return false; +} + +void +SimpleTxnPublish::commit() throw() { + try { + for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->commitEnqueue(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); + } +} + +void +SimpleTxnPublish::rollback() throw() { + try { + for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->abortEnqueue(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); + } +} + +uint64_t +SimpleTxnPublish::contentSize() { + return m_msg->contentSize(); +} + +void +SimpleTxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { + m_queues.push_back(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(queue.get(), m_msg))); + m_delivered = true; +} + +SimpleMessage& +SimpleTxnPublish::getMessage() { + return *m_msg; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.h b/cpp/src/qpid/broker/SimpleTxnPublish.h new file mode 100644 index 0000000000..0aaf8e4ba0 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnPublish.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnPublish.h + */ + +#ifndef qpid_broker_SimpleTxnPublish_h_ +#define qpid_broker_SimpleTxnPublish_h_ + +#include "SimpleDeliverable.h" +#include "SimpleTxnOp.h" + +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> +#include <list> + + +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; +class SimpleMessage; +class SimpleQueue; + +class SimpleTxnPublish : public SimpleTxnOp, + public SimpleDeliverable +{ +public: + SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg); + virtual ~SimpleTxnPublish(); + + // --- Interface TxOp --- + bool prepare(SimpleTxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); + + // --- Interface Deliverable --- + uint64_t contentSize(); + void deliverTo(const boost::shared_ptr<SimpleQueue>& queue); + SimpleMessage& getMessage(); + +private: + boost::intrusive_ptr<SimpleMessage> m_msg; + std::list<boost::shared_ptr<SimpleQueuedMessage> > m_queues; + std::list<boost::shared_ptr<SimpleQueuedMessage> > m_prepared; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnPublish_h_ diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index 63e2de2b41..527cb4741f 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -26,7 +26,7 @@ namespace qpid { namespace broker { -TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, +TxnAsyncContext::TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq): m_tb(tb), @@ -37,7 +37,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, TxnAsyncContext::~TxnAsyncContext() {} -TxnBuffer* +SimpleTxnBuffer* TxnAsyncContext::getTxnBuffer() const { return m_tb; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 9c617238e8..04f6ef76f5 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -29,38 +29,34 @@ #include "qpid/asyncStore/AsyncOperation.h" namespace qpid { -//namespace asyncStore { -//class AsyncOperation; -//} namespace broker { class AsyncResultHandle; class AsyncResultQueue; -//class TxnHandle; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); class TxnAsyncContext: public BrokerAsyncContext { public: - TxnAsyncContext(TxnBuffer* const tb, + TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~TxnAsyncContext(); - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; void invokeCallback(const AsyncResultHandle* const) const; private: - TxnBuffer* const m_tb; + SimpleTxnBuffer* const m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; class TpcTxnAsyncContext : public TxnAsyncContext { public: - TpcTxnAsyncContext(TxnBuffer* const tb, + TpcTxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : TxnAsyncContext(tb, rcb, arq) |
