summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-14 18:21:50 +0000
committerAlan Conway <aconway@apache.org>2008-10-14 18:21:50 +0000
commitb17ea7f64cf3eb42c4614aa57508a7aaca132807 (patch)
tree3f79c52e3b96a19cb0ce6b591b5ea64284c41cad /cpp/src/qpid/client/FailoverSubscriptionManager.cpp
parent8ec6597d5372ccebb689483b8074085f278022e1 (diff)
downloadqpid-python-b17ea7f64cf3eb42c4614aa57508a7aaca132807.tar.gz
Bug fixes for client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704596 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/FailoverSubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.cpp115
1 files changed, 68 insertions, 47 deletions
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
index 5a790e26cd..c1ef7d00c4 100644
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
@@ -34,7 +34,8 @@ namespace client {
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
name("no_name"),
- newSessionIsValid(false)
+ newSessionIsValid(false),
+ no_failover(false)
{
subscriptionManager = new SubscriptionManager(fs->session);
fs->setFailoverSubscriptionManager(this);
@@ -45,9 +46,10 @@ FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs)
void
FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
{
- Lock l(lock);
+ sys::Monitor::ScopedLock l(lock);
newSession = _newSession;
newSessionIsValid = true;
+ // lock.notifyAll();
}
@@ -55,28 +57,11 @@ FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
void
FailoverSubscriptionManager::failover ( )
{
- // Stop the subscription manager thread so it can notice the failover in progress.
+ sys::Monitor::ScopedLock l(lock);
+ // Stop the subscription manager thread so it can notice
+ // the failover in progress.
subscriptionManager->stop();
-}
-
-
-
-
-FailoverSubscriptionManager::subscribeArgs::subscribeArgs
-( int _interface,
- MessageListener * _listener,
- LocalQueue * _localQueue,
- const std::string * _queue,
- const FlowControl * _flow,
- const std::string * _tag
-) :
- interface(_interface),
- listener(_listener),
- localQueue(_localQueue),
- queue(_queue),
- flow(_flow),
- tag(_tag)
-{
+ lock.notifyAll();
}
@@ -86,15 +71,19 @@ void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
const FlowControl & flow,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( listener,
queue,
flow,
tag
);
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) );
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) );
}
@@ -103,15 +92,20 @@ void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
const FlowControl & flow,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( localQueue,
queue,
flow,
tag
);
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) );
+
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) );
}
@@ -119,15 +113,19 @@ FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
void
FailoverSubscriptionManager::subscribe ( MessageListener & listener,
const std::string & queue,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( listener,
queue,
tag
);
- // TODO -- more than one subscription
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) );
+
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag, false ) );
}
@@ -136,14 +134,19 @@ FailoverSubscriptionManager::subscribe ( MessageListener & listener,
void
FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
const std::string & queue,
- const std::string & tag
+ const std::string & tag,
+ bool record_this
)
{
+ sys::Monitor::ScopedLock l(lock);
+
subscriptionManager->subscribe ( localQueue,
queue,
tag
);
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) );
+
+ if ( record_this )
+ subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag, false ) );
}
@@ -172,31 +175,46 @@ FailoverSubscriptionManager::cancel ( const std::string tag )
void
FailoverSubscriptionManager::run ( ) // User Thread
{
+ std::vector<subscribeFn> mySubscribeFns;
+
while ( 1 )
{
subscriptionManager->run ( );
- Lock l(lock);
+
// When we drop out of run, if there is a new Session
// waiting for us, this is a failover. Otherwise, just
// return control to usercode.
- if ( newSessionIsValid )
+
{
- delete subscriptionManager;
- subscriptionManager = new SubscriptionManager(newSession);
- for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin();
- i < subscribeFns.end();
- ++ i
- )
- {
- (*i) ();
- }
- newSessionIsValid = false;
+ sys::Monitor::ScopedLock l(lock);
+
+
+ while ( !newSessionIsValid && !no_failover )
+ lock.wait();
+
+
+ if ( newSessionIsValid )
+ {
+ newSessionIsValid = false;
+ delete subscriptionManager;
+ subscriptionManager = new SubscriptionManager(newSession);
+ mySubscribeFns.swap ( subscribeFns );
+ }
+ else
+ {
+ // Not a failover, return to user code.
+ break;
+ }
}
- else
+
+ for ( std::vector<subscribeFn>::iterator i = mySubscribeFns.begin();
+ i != mySubscribeFns.end();
+ ++ i
+ )
{
- // Not a failover, return to user code.
- break;
+ (*i) ();
}
+
}
}
@@ -222,8 +240,11 @@ FailoverSubscriptionManager::setAutoStop ( bool set )
void
FailoverSubscriptionManager::stop ( )
{
+ sys::Monitor::ScopedLock l(lock);
+ no_failover = true;
subscriptionManager->stop ( );
+ lock.notifyAll();
}