From b17ea7f64cf3eb42c4614aa57508a7aaca132807 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 14 Oct 2008 18:21:50 +0000 Subject: Bug fixes for client-side failover. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704596 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/FailoverSubscriptionManager.cpp | 115 ++++++++++++--------- 1 file changed, 68 insertions(+), 47 deletions(-) (limited to 'cpp/src/qpid/client/FailoverSubscriptionManager.cpp') diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index 5a790e26cd..c1ef7d00c4 100644 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -34,7 +34,8 @@ namespace client { FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : name("no_name"), - newSessionIsValid(false) + newSessionIsValid(false), + no_failover(false) { subscriptionManager = new SubscriptionManager(fs->session); fs->setFailoverSubscriptionManager(this); @@ -45,9 +46,10 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) void FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) { - Lock l(lock); + sys::Monitor::ScopedLock l(lock); newSession = _newSession; newSessionIsValid = true; + // lock.notifyAll(); } @@ -55,28 +57,11 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) void FailoverSubscriptionManager::failover ( ) { - // Stop the subscription manager thread so it can notice the failover in progress. + sys::Monitor::ScopedLock l(lock); + // Stop the subscription manager thread so it can notice + // the failover in progress. subscriptionManager->stop(); -} - - - - -FailoverSubscriptionManager::subscribeArgs::subscribeArgs -( int _interface, - MessageListener * _listener, - LocalQueue * _localQueue, - const std::string * _queue, - const FlowControl * _flow, - const std::string * _tag -) : - interface(_interface), - listener(_listener), - localQueue(_localQueue), - queue(_queue), - flow(_flow), - tag(_tag) -{ + lock.notifyAll(); } @@ -86,15 +71,19 @@ void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, const FlowControl & flow, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( listener, queue, flow, tag ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) ); } @@ -103,15 +92,20 @@ void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, const FlowControl & flow, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( localQueue, queue, flow, tag ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); + + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) ); } @@ -119,15 +113,19 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, void FailoverSubscriptionManager::subscribe ( MessageListener & listener, const std::string & queue, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( listener, queue, tag ); - // TODO -- more than one subscription - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); + + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag, false ) ); } @@ -136,14 +134,19 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener, void FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, const std::string & queue, - const std::string & tag + const std::string & tag, + bool record_this ) { + sys::Monitor::ScopedLock l(lock); + subscriptionManager->subscribe ( localQueue, queue, tag ); - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); + + if ( record_this ) + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag, false ) ); } @@ -172,31 +175,46 @@ FailoverSubscriptionManager::cancel ( const std::string tag ) void FailoverSubscriptionManager::run ( ) // User Thread { + std::vector mySubscribeFns; + while ( 1 ) { subscriptionManager->run ( ); - Lock l(lock); + // When we drop out of run, if there is a new Session // waiting for us, this is a failover. Otherwise, just // return control to usercode. - if ( newSessionIsValid ) + { - delete subscriptionManager; - subscriptionManager = new SubscriptionManager(newSession); - for ( std::vector::iterator i = subscribeFns.begin(); - i < subscribeFns.end(); - ++ i - ) - { - (*i) (); - } - newSessionIsValid = false; + sys::Monitor::ScopedLock l(lock); + + + while ( !newSessionIsValid && !no_failover ) + lock.wait(); + + + if ( newSessionIsValid ) + { + newSessionIsValid = false; + delete subscriptionManager; + subscriptionManager = new SubscriptionManager(newSession); + mySubscribeFns.swap ( subscribeFns ); + } + else + { + // Not a failover, return to user code. + break; + } } - else + + for ( std::vector::iterator i = mySubscribeFns.begin(); + i != mySubscribeFns.end(); + ++ i + ) { - // Not a failover, return to user code. - break; + (*i) (); } + } } @@ -222,8 +240,11 @@ FailoverSubscriptionManager::setAutoStop ( bool set ) void FailoverSubscriptionManager::stop ( ) { + sys::Monitor::ScopedLock l(lock); + no_failover = true; subscriptionManager->stop ( ); + lock.notifyAll(); } -- cgit v1.2.1