diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-17 12:17:55 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-17 12:17:55 +0000 |
| commit | 59a860e074cc38f43b56fe63ff93bd05648dd58f (patch) | |
| tree | b2cd38778f0f4513293f4a2b551f46d72b419be9 /qpid | |
| parent | 505150f4ff07c7714411dd5b2160e32c643833b6 (diff) | |
| download | qpid-python-59a860e074cc38f43b56fe63ff93bd05648dd58f.tar.gz | |
Scope exclusive queues to sessions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@637854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionToken.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 27 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.h | 17 | ||||
| -rw-r--r-- | qpid/python/cpp_failing_0-10.txt | 2 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/alternate_exchange.py | 7 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/queue.py | 25 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/tx.py | 46 |
11 files changed, 96 insertions, 55 deletions
diff --git a/qpid/cpp/src/qpid/broker/ConnectionToken.h b/qpid/cpp/src/qpid/broker/ConnectionToken.h index 7e7f813d0e..38b7d7d098 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionToken.h +++ b/qpid/cpp/src/qpid/broker/ConnectionToken.h @@ -21,13 +21,14 @@ #ifndef _ConnectionToken_ #define _ConnectionToken_ +#include "OwnershipToken.h" namespace qpid { namespace broker { /** * An empty interface allowing opaque implementations of some * form of token to identify a connection. */ - class ConnectionToken{ + class ConnectionToken : public OwnershipToken { public: virtual ~ConnectionToken(){} }; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index abe4f3f9a5..c4094a117b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -46,7 +46,7 @@ using std::mem_fun; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, - const ConnectionToken* const _owner, + const OwnershipToken* const _owner, Manageable* parent) : name(_name), @@ -582,7 +582,7 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) } -bool Queue::isExclusiveOwner(const ConnectionToken* const o) const +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(ownershipLock); return o == owner; @@ -594,7 +594,7 @@ void Queue::releaseExclusiveOwnership() owner = 0; } -bool Queue::setExclusiveOwner(const ConnectionToken* const o) +bool Queue::setExclusiveOwner(const OwnershipToken* const o) { Mutex::ScopedLock locker(ownershipLock); if (owner) { diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index aaae175be8..bd6f1fe2c7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -28,7 +28,7 @@ #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> #include "qpid/framing/amqp_types.h" -#include "ConnectionToken.h" +#include "OwnershipToken.h" #include "Consumer.h" #include "Message.h" #include "qpid/framing/FieldTable.h" @@ -63,7 +63,7 @@ namespace qpid { const string name; const bool autodelete; MessageStore* store; - const ConnectionToken* owner; + const OwnershipToken* owner; uint32_t consumerCount; bool exclusive; Listeners listeners; @@ -100,7 +100,7 @@ namespace qpid { Queue(const string& name, bool autodelete = false, MessageStore* const store = 0, - const ConnectionToken* const owner = 0, + const OwnershipToken* const owner = 0, Manageable* parent = 0); ~Queue(); @@ -143,9 +143,9 @@ namespace qpid { uint32_t getMessageCount() const; uint32_t getConsumerCount() const; inline const string& getName() const { return name; } - bool isExclusiveOwner(const ConnectionToken* const o) const; + bool isExclusiveOwner(const OwnershipToken* const o) const; void releaseExclusiveOwnership(); - bool setExclusiveOwner(const ConnectionToken* const o); + bool setExclusiveOwner(const OwnershipToken* const o); bool hasExclusiveConsumer() const; bool hasExclusiveOwner() const; inline bool isDurable() const { return store != 0; } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 14d5f362e3..61bdb0ffde 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -33,7 +33,7 @@ QueueRegistry::~QueueRegistry(){} std::pair<Queue::shared_ptr, bool> QueueRegistry::declare(const string& declareName, bool durable, - bool autoDelete, const ConnectionToken* owner) + bool autoDelete, const OwnershipToken* owner) { RWlock::ScopedWlock locker(lock); string name = declareName.empty() ? generateName() : declareName; diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index ccccef31b5..60da0619f4 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -48,7 +48,7 @@ class QueueRegistry{ * was created by this declare call false if it already existed. */ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, bool autodelete = false, - const ConnectionToken* const owner = 0); + const OwnershipToken* const owner = 0); /** * Destroy the named queue. @@ -62,7 +62,6 @@ class QueueRegistry{ * subsequent calls to find or declare with the same name. * */ - void destroyLH (const string& name); void destroy (const string& name); template <class Test> bool destroyIf(const string& name, Test test) { @@ -107,6 +106,9 @@ private: int counter; MessageStore* store; management::Manageable* parent; + + //destroy impl that assumes lock is already held: + void destroyLH (const string& name); }; diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 64bb7cdae3..5d33e68fab 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -19,6 +19,7 @@ #include "Connection.h" #include "DeliveryToken.h" #include "MessageDelivery.h" +#include "Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> @@ -180,6 +181,22 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str } } +SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker()) +{} + + +SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() +{ + while (!exclusiveQueues.empty()) { + Queue::shared_ptr q(exclusiveQueues.front()); + q->releaseExclusiveOwnership(); + if (q->canAutoDelete()) { + Queue::tryAutoDelete(broker, q); + } + exclusiveQueues.erase(exclusiveQueues.begin()); + } +} + Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { Queue::shared_ptr queue = getQueue(name); @@ -212,7 +229,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& getBroker().getQueues().declare( name, durable, autoDelete, - exclusive ? &getConnection() : 0); + exclusive ? this : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -230,15 +247,15 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& //handle automatic cleanup: if (exclusive) { - getConnection().exclusiveQueues.push_back(queue); + exclusiveQueues.push_back(queue); } } else { - if (exclusive && queue->setExclusiveOwner(&getConnection())) { - getConnection().exclusiveQueues.push_back(queue); + if (exclusive && queue->setExclusiveOwner(this)) { + exclusiveQueues.push_back(queue); } } } - if (exclusive && !queue->isExclusiveOwner(&getConnection())) + if (exclusive && !queue->isExclusiveOwner(this)) throw ResourceLockedException( QPID_MSG("Cannot grant exclusive access to queue " << queue->getName())); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h index e4c3a8676f..dad89cd123 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.h +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h @@ -25,8 +25,11 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/SequenceSet.h" +#include "OwnershipToken.h" +#include <vector> #include <boost/function.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { @@ -34,6 +37,7 @@ namespace broker { class Channel; class Connection; class Broker; +class Queue; /** * Per-channel protocol adapter. @@ -44,7 +48,7 @@ class Broker; * peer. * */ -class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations + class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations { public: SessionAdapter(SemanticState& session); @@ -116,12 +120,15 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations shared_ptr<Exchange> alternate); }; - class QueueHandlerImpl : - public Queue010Handler, - public HandlerHelper + class QueueHandlerImpl : public Queue010Handler, + public HandlerHelper, public OwnershipToken { + Broker& broker; + std::vector< boost::shared_ptr<Queue> > exclusiveQueues; + public: - QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + QueueHandlerImpl(SemanticState& session); + ~QueueHandlerImpl(); void declare(const std::string& queue, const std::string& alternateExchange, diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt index 8ae41b1d87..70cd3d19ed 100644 --- a/qpid/python/cpp_failing_0-10.txt +++ b/qpid/python/cpp_failing_0-10.txt @@ -33,5 +33,3 @@ tests_0-10.message.MessageTests.test_recover_requeue tests_0-10.testlib.TestBaseTest.testAssertEmptyFail tests_0-10.testlib.TestBaseTest.testAssertEmptyPass tests_0-10.testlib.TestBaseTest.testMessageProperties -tests_0-10.queue.QueueTests.test_autodelete_shared -tests_0-10.queue.QueueTests.test_declare_exclusive diff --git a/qpid/python/tests_0-10/alternate_exchange.py b/qpid/python/tests_0-10/alternate_exchange.py index 418a8c90ee..c177c3deb7 100644 --- a/qpid/python/tests_0-10/alternate_exchange.py +++ b/qpid/python/tests_0-10/alternate_exchange.py @@ -111,14 +111,13 @@ class AlternateExchangeTests(TestBase010): session = self.session session.exchange_declare(exchange="alternate", type="fanout") - session = self.conn.session("alternate", 2) - session.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate") + session2 = self.conn.session("alternate", 2) + session2.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate") try: - session.exchange_delete(exchange="alternate") + session2.exchange_delete(exchange="alternate") self.fail("Expected deletion of in-use alternate-exchange to fail") except SessionException, e: session = self.session - session.queue_delete(queue="q") session.exchange_delete(exchange="alternate") self.assertEquals(530, e.args[0].error_code) diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py index 39305d4554..758794dd52 100644 --- a/qpid/python/tests_0-10/queue.py +++ b/qpid/python/tests_0-10/queue.py @@ -86,19 +86,17 @@ class QueueTests(TestBase010): Test that the exclusive field is honoured in queue.declare """ # TestBase.setUp has already opened session(1) - c1 = self.session + s1 = self.session # Here we open a second separate connection: - other = self.connect() - c2 = other.session(1) - c2.session_open() + s2 = self.conn.session("other", 2) #declare an exclusive queue: - c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) try: #other connection should not be allowed to declare this: - c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) self.fail("Expected second exclusive queue_declare to raise a channel exception") - except Closed, e: + except SessionException, e: self.assertEquals(405, e.args[0].error_code) @@ -322,24 +320,23 @@ class QueueTests(TestBase010): Test auto-deletion (of non-exclusive queues) """ session = self.session - other = self.connect() - session2 = other.session(1) - session2.session_open() + session2 =self.conn.session("other", 1) session.queue_declare(queue="auto-delete-me", auto_delete=True) #consume from both sessions - reply = session.basic_consume(queue="auto-delete-me") - session2.basic_consume(queue="auto-delete-me") + tag = "my-tag" + session.message_subscribe(queue="auto-delete-me", destination=tag) + session2.message_subscribe(queue="auto-delete-me", destination=tag) #implicit cancel - session2.session_close() + session2.close() #check it is still there session.queue_declare(queue="auto-delete-me", passive=True) #explicit cancel => queue is now unused again: - session.basic_cancel(consumer_tag=reply.consumer_tag) + session.message_cancel(destination=tag) #NOTE: this assumes there is no timeout in use diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py index 59298bad1b..5aef2b00e8 100644 --- a/qpid/python/tests_0-10/tx.py +++ b/qpid/python/tests_0-10/tx.py @@ -30,23 +30,30 @@ class TxTests(TestBase010): """ Test that commited publishes are delivered and commited acks are not re-delivered """ + session = self.session + + #declare queues and create subscribers in the checking session + #to ensure that the queues are not auto-deleted too early: + self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"]) + session.message_subscribe(queue="tx-commit-a", destination="qa") + session.message_subscribe(queue="tx-commit-b", destination="qb") + session.message_subscribe(queue="tx-commit-c", destination="qc") + + #use a separate session for actual work session2 = self.conn.session("worker", 2) self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") session2.tx_commit() session2.close() - #use a different session with new subscriptions to ensure - #there is no redelivery of acked messages: - session = self.session session.tx_select() - self.subscribe(session, queue="tx-commit-a", destination="qa") + self.enable_flow("qa") queue_a = session.incoming("qa") - self.subscribe(session, queue="tx-commit-b", destination="qb") + self.enable_flow("qb") queue_b = session.incoming("qb") - self.subscribe(session, queue="tx-commit-c", destination="qc") + self.enable_flow("qc") queue_c = session.incoming("qc") #check results @@ -76,6 +83,12 @@ class TxTests(TestBase010): """ Test that a session closed with an open transaction is effectively rolled back """ + session = self.session + self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"]) + session.message_subscribe(queue="tx-autorollback-a", destination="qa") + session.message_subscribe(queue="tx-autorollback-b", destination="qb") + session.message_subscribe(queue="tx-autorollback-c", destination="qc") + session2 = self.conn.session("worker", 2) queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") @@ -87,16 +100,15 @@ class TxTests(TestBase010): session2.close() - session = self.session session.tx_select() - self.subscribe(session, queue="tx-autorollback-a", destination="qa") + self.enable_flow("qa") queue_a = session.incoming("qa") - self.subscribe(session, queue="tx-autorollback-b", destination="qb") + self.enable_flow("qb") queue_b = session.incoming("qb") - self.subscribe(session, queue="tx-autorollback-c", destination="qc") + self.enable_flow("qc") queue_c = session.incoming("qc") #check results @@ -169,9 +181,7 @@ class TxTests(TestBase010): commit and rollback """ #setup: - session.queue_declare(queue=name_a, exclusive=True, auto_delete=True) - session.queue_declare(queue=name_b, exclusive=True, auto_delete=True) - session.queue_declare(queue=name_c, exclusive=True, auto_delete=True) + self.declare_queues([name_a, name_b, name_c]) key = "my_key_" + name_b topic = "my_topic_" + name_c @@ -232,6 +242,11 @@ class TxTests(TestBase010): session.message_transfer(message=Message(dp, mp, "TxMessage 7")) return queue_a, queue_b, queue_c, acked + def declare_queues(self, names, session=None): + session = session or self.session + for n in names: + session.queue_declare(queue=n, auto_delete=True) + def subscribe(self, session=None, **keys): session = session or self.session consumer_tag = keys["destination"] @@ -239,6 +254,11 @@ class TxTests(TestBase010): session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + def enable_flow(self, tag, session=None): + session = session or self.session + session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF) + def complete(self, session, msg): session.receiver._completed.add(msg.id)#TODO: this may be done automatically session.channel.session_completed(session.receiver._completed) |
