diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
commit | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch) | |
tree | dcfb94e75656c6c239fc3dcb754cd2015126424d /cpp/src/qmf/ConsoleSession.cpp | |
parent | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff) | |
download | qpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz |
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 125 |
1 files changed, 18 insertions, 107 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index 2dfc894c58..e12c1152f6 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -54,7 +54,6 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); void ConsoleSession::open() { impl->open(); } void ConsoleSession::close() { impl->close(); } bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } -int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } @@ -66,9 +65,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), - opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) + connection(c), domain("default"), maxAgentAgeMinutes(5), + opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -92,14 +91,7 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); - - iter = optMap.find("max-thread-wait-time"); - if (iter != optMap.end()) - maxThreadWaitTime = iter->second.asUint32(); } - - if (maxThreadWaitTime > 60) - maxThreadWaitTime = 60; } @@ -107,11 +99,6 @@ ConsoleSessionImpl::~ConsoleSessionImpl() { if (opened) close(); - - if (thread) { - thread->join(); - delete thread; - } } @@ -166,12 +153,6 @@ void ConsoleSessionImpl::open() if (opened) throw QmfException("The session is already open"); - // If the thread exists, join and delete it before creating a new one. - if (thread) { - thread->join(); - delete thread; - } - // Establish messaging addresses directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; @@ -200,36 +181,30 @@ void ConsoleSessionImpl::open() // Start the receiver thread threadCanceled = false; - opened = true; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); if (agentQuery) sendAgentLocate(); + + opened = true; } -void ConsoleSessionImpl::closeAsync() +void ConsoleSessionImpl::close() { if (!opened) throw QmfException("The session is already closed"); - // Stop the receiver thread. Don't join it until the destructor is called or open() is called. + // Stop and join the receiver thread threadCanceled = true; - opened = false; -} - + thread->join(); + delete thread; -void ConsoleSessionImpl::close() -{ - closeAsync(); - - if (thread) { - thread->join(); - delete thread; - thread = 0; - } + // Close the AMQP session + session.close(); + opened = false; } @@ -238,19 +213,13 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) { - int64_t nsecs(qpid::sys::TIME_INFINITE); - if ((uint64_t)(nsecs / 1000000) > milliseconds) - nsecs = (int64_t) milliseconds * 1000000; - qpid::sys::Duration then(nsecs); - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); - } + if (eventQueue.empty()) + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); - if (eventQueue.empty()) - alertEventNotifierLH(false); return true; } @@ -258,27 +227,6 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) } -int ConsoleSessionImpl::pendingEvents() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventQueue.size(); -} - - -void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier) -{ - qpid::sys::Mutex::ScopedLock l(lock); - eventNotifier = notifier; -} - - -EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventNotifier; -} - - uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); @@ -320,10 +268,8 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) { bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) { + if (notify) cond.notify(); - alertEventNotifierLH(true); - } } @@ -475,23 +421,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian iter = content.find("_values"); if (iter == content.end()) return; - const Variant::Map& in_attrs(iter->second.asMap()); - Variant::Map attrs; - - // - // Copy the map from the message to "attrs". Translate any old-style - // keys to their new key values in the process. - // - for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { - if (iter->first == "epoch") - attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; - else if (iter->first == "timestamp") - attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; - else if (iter->first == "heartbeat_interval") - attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; - else - attrs[iter->first] = iter->second; - } + Variant::Map attrs(iter->second.asMap()); iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) @@ -632,13 +562,6 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) } -void ConsoleSessionImpl::alertEventNotifierLH(bool readable) -{ - if (eventNotifier) - eventNotifier->setReadable(readable); -} - - void ConsoleSessionImpl::run() { QPID_LOG(debug, "ConsoleSession thread started"); @@ -649,7 +572,7 @@ void ConsoleSessionImpl::run() qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); + bool valid = session.nextReceiver(rx, Duration::SECOND); if (threadCanceled) break; if (valid) { @@ -666,18 +589,6 @@ void ConsoleSessionImpl::run() enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); } - session.close(); QPID_LOG(debug, "ConsoleSession thread exiting"); } - -ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session) -{ - return *session.impl; -} - - -const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session) -{ - return *session.impl; -} |