diff options
author | Ted Ross <tross@apache.org> | 2009-06-08 20:23:19 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-06-08 20:23:19 +0000 |
commit | 87d3337eefa96eceeb0247039b0738352ef91130 (patch) | |
tree | 6fce254c404993e0f2d651b58830f31b238907b7 | |
parent | 5fe7eb38df0c0e205e9663ac6056aeef2ac0bc8b (diff) | |
download | qpid-python-87d3337eefa96eceeb0247039b0738352ef91130.tar.gz |
Bugfixes in the c++ console API:
- Connection threads now shut down cleanly
- get-query timeouts now work properly
- waitForStable now only waits for connected brokers
The ping example was improved. It now more cleanly handles connection loss/reconnect.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@782766 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/examples/qmf-console/ping.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/console/Broker.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/console/Broker.h | 39 | ||||
-rw-r--r-- | cpp/src/qpid/console/SessionManager.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/console/SessionManager.h | 2 |
5 files changed, 83 insertions, 36 deletions
diff --git a/cpp/examples/qmf-console/ping.cpp b/cpp/examples/qmf-console/ping.cpp index 39ec0d3039..405c15f1c4 100644 --- a/cpp/examples/qmf-console/ping.cpp +++ b/cpp/examples/qmf-console/ping.cpp @@ -43,6 +43,7 @@ int main_int(int /*argc*/, char** /*argv*/) // SessionManager::Settings smSettings; smSettings.methodTimeout = 2; + smSettings.getTimeout = 2; // // Declare the console session manager. With a null listener argument, it defaults to @@ -58,18 +59,21 @@ int main_int(int /*argc*/, char** /*argv*/) uint32_t count = 5; // The number of echo requests we will send to the broker. Object::Vector list; // A container for holding objects retrieved from the broker. - // - // Query for a list of 'broker' objects from the Management Database - // - sm.getObjects(list, "broker"); + for (uint32_t iter = 0; iter < count; iter++) { + cout << "Ping Broker: " << broker->getUrl() << "... "; + cout.flush(); - // - // We expect one object (since we are connected to only one broker) - // - if (list.size() == 1) { - Object& brokerObject = *(list.begin()); + // + // Query for a list of 'broker' objects from the Management Database + // + sm.getObjects(list, "broker"); + + // + // We expect one object (since we are connected to only one broker) + // + if (list.size() == 1) { + Object& brokerObject = *(list.begin()); - for (uint32_t iter = 0; iter < count; iter++) { // // Declare a container for arguments to be sent with the "echo" method // that we will invoke on the remote "broker" object. @@ -87,9 +91,6 @@ int main_int(int /*argc*/, char** /*argv*/) args.addUint("sequence", iter); args.addString("body", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"); - cout << "Ping Broker: " << broker->getUrl() << "... "; - cout.flush(); - // // Invoke the method. This is a synchronous operation that will block until // the method completes and returns a result. @@ -109,6 +110,9 @@ int main_int(int /*argc*/, char** /*argv*/) if (result.code == 0 && iter < count - 1) qpid::sys::sleep(1); + } else { + cout << "Disconnected..." << endl; + qpid::sys::sleep(1); } } diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp index 789b90eaaf..4f90afd39a 100644 --- a/cpp/src/qpid/console/Broker.cpp +++ b/cpp/src/qpid/console/Broker.cpp @@ -57,6 +57,8 @@ Broker::Broker(SessionManager& sm, ConnectionSettings& settings) : Broker::~Broker() { + connThreadBody.shutdown(); + connThread.join(); } string Broker::getUrl() const @@ -184,6 +186,8 @@ void Broker::ConnectionThread::run() subscriptions->setFlowControl(dest, FlowControl::unlimited()); { Mutex::ScopedLock _lock(connLock); + if (shuttingDown) + return; operational = true; broker.resetAgents(); broker.connected = true; @@ -199,16 +203,26 @@ void Broker::ConnectionThread::run() broker.sessionManager.handleBrokerDisconnect(&broker); } delay = delayMin; + connection.close(); delete subscriptions; subscriptions = 0; - session.close(); } catch (std::exception &e) { QPID_LOG(debug, " outer exception: " << e.what()); if (delay < delayMax) delay *= delayFactor; } - ::sleep(delay); + { + Mutex::ScopedLock _lock(connLock); + if (shuttingDown) + return; + { + Mutex::ScopedUnlock _unlock(connLock); + ::sleep(delay); + } + if (shuttingDown) + return; + } } } @@ -253,6 +267,16 @@ void Broker::ConnectionThread::bindExchange(const std::string& exchange, const s arg::bindingKey=key); } +void Broker::ConnectionThread::shutdown() +{ + { + Mutex::ScopedLock _lock(connLock); + shuttingDown = true; + } + if (subscriptions) + subscriptions->stop(); +} + void Broker::waitForStable() { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/console/Broker.h b/cpp/src/qpid/console/Broker.h index ddbd973dfe..dc85d45d62 100644 --- a/cpp/src/qpid/console/Broker.h +++ b/cpp/src/qpid/console/Broker.h @@ -73,7 +73,6 @@ namespace console { SessionManager& sessionManager; AgentMap agents; - client::SubscriptionManager* subscription; bool connected; std::string error; std::string amqpSessionId; @@ -88,25 +87,27 @@ namespace console { friend class ConnectionThread; class ConnectionThread : public sys::Runnable { - bool operational; - Broker& broker; - framing::Uuid sessionId; - client::Connection connection; - client::Session session; - client::SubscriptionManager* subscriptions; - std::stringstream queueName; - sys::Mutex connLock; - void run(); - public: + bool operational; + bool shuttingDown; + Broker& broker; + framing::Uuid sessionId; + client::Connection connection; + client::Session session; + client::SubscriptionManager* subscriptions; + std::stringstream queueName; + sys::Mutex connLock; + void run(); + public: ConnectionThread(Broker& _broker) : - operational(false), broker(_broker), subscriptions(0) {} - ~ConnectionThread(); - void sendBuffer(qpid::framing::Buffer& buf, - uint32_t length, - const std::string& exchange = "qpid.management", - const std::string& routingKey = "broker"); - void bindExchange(const std::string& exchange, const std::string& key); - }; + operational(false), shuttingDown(false), broker(_broker), subscriptions(0) {} + ~ConnectionThread(); + void sendBuffer(qpid::framing::Buffer& buf, + uint32_t length, + const std::string& exchange = "qpid.management", + const std::string& routingKey = "broker"); + void bindExchange(const std::string& exchange, const std::string& key); + void shutdown(); + }; ConnectionThread connThreadBody; sys::Thread connThread; diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp index feccf92b01..6cba0933e2 100644 --- a/cpp/src/qpid/console/SessionManager.cpp +++ b/cpp/src/qpid/console/SessionManager.cpp @@ -41,6 +41,13 @@ SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) : bindingKeys(); } +SessionManager::~SessionManager() +{ + for (vector<Broker*>::iterator iter = brokers.begin(); + iter != brokers.end(); iter++) + delete *iter; +} + Broker* SessionManager::addBroker(client::ConnectionSettings& settings) { Broker* broker(new Broker(*this, settings)); @@ -58,6 +65,7 @@ void SessionManager::delBroker(Broker* broker) iter != brokers.end(); iter++) if (*iter == broker) { brokers.erase(iter); + delete broker; return; } } @@ -171,6 +179,11 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas syncSequenceList.clear(); error = string(); + if (agentList.empty()) { + objects = getResult; + return; + } + for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) { Agent* agent = *iter; char rawbuffer[512]; @@ -191,8 +204,12 @@ void SessionManager::getObjects(Object::Vector& objects, const std::string& clas { Mutex::ScopedLock _lock(lock); + sys::AbsTime startTime = sys::now(); while (!syncSequenceList.empty() && error.empty()) { cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC)); + sys::AbsTime currTime = sys::now(); + if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC) + break; } } @@ -221,7 +238,8 @@ void SessionManager::allBrokersStable() Mutex::ScopedLock l(brokerListLock); for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) - (*iter)->waitForStable(); + if ((*iter)->isConnected()) + (*iter)->waitForStable(); } void SessionManager::startProtocol(Broker* broker) diff --git a/cpp/src/qpid/console/SessionManager.h b/cpp/src/qpid/console/SessionManager.h index 4341fe317c..770d4b3d28 100644 --- a/cpp/src/qpid/console/SessionManager.h +++ b/cpp/src/qpid/console/SessionManager.h @@ -52,7 +52,7 @@ class SessionManager public: typedef std::vector<std::string> NameVector; typedef std::vector<ClassKey> KeyVector; - ~SessionManager() {} + ~SessionManager(); struct Settings { bool rcvObjects; |