diff options
| author | Alan Conway <aconway@apache.org> | 2008-06-06 20:23:28 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-06-06 20:23:28 +0000 |
| commit | fb1f5c770c551fe526adf5b860dd72cf5eb07311 (patch) | |
| tree | 79a0d3ccb278e51b9ec5213b038b903d768c2727 /cpp/src/qpid/client | |
| parent | 76c922baf182bb367feed2ec014e7cab9db7f79d (diff) | |
| download | qpid-python-fb1f5c770c551fe526adf5b860dd72cf5eb07311.tar.gz | |
Added exceptions to sys::Waitable.
Fixed client side deadlock involving client::Bounds.
Fixed incorrect exception messages during connection shutdown.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@664114 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Bounds.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Bounds.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 14 |
6 files changed, 52 insertions, 64 deletions
diff --git a/cpp/src/qpid/client/Bounds.cpp b/cpp/src/qpid/client/Bounds.cpp index 1df21db941..aac18022bc 100644 --- a/cpp/src/qpid/client/Bounds.cpp +++ b/cpp/src/qpid/client/Bounds.cpp @@ -1,49 +1,40 @@ #include "Bounds.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Waitable.h" namespace qpid { namespace client { -using sys::Monitor; +using sys::Waitable; Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} -bool Bounds::expand(size_t sizeRequired, bool block) -{ - if (max) { - Monitor::ScopedLock l(lock); - current += sizeRequired; - if (block) { - while (current > max) { - QPID_LOG(debug, "Waiting for bounds: " << *this); - lock.wait(); - } - QPID_LOG(debug, "Bounds ok: " << *this); - } - return current <= max; - } else { - return true; +bool Bounds::expand(size_t sizeRequired, bool block) { + if (!max) return true; + Waitable::ScopedLock l(lock); + current += sizeRequired; + if (block) { + Waitable::ScopedWait w(lock); + while (current > max) + lock.wait(); } + return current <= max; } -void Bounds::reduce(size_t size) -{ +void Bounds::reduce(size_t size) { if (!max || size == 0) return; - Monitor::ScopedLock l(lock); + Waitable::ScopedLock l(lock); if (current == 0) return; - bool needNotify = current > max; current -= std::min(size, current); - if (needNotify && current < max) { - //todo: notify one at a time, but ensure that all threads are - //eventually notified - lock.notifyAll(); + if (current < max && lock.hasWaiters()) { + assert(lock.hasWaiters() == 1); + lock.notify(); } } -size_t Bounds::getCurrentSize() -{ - Monitor::ScopedLock l(lock); +size_t Bounds::getCurrentSize() { + Waitable::ScopedLock l(lock); return current; } @@ -52,4 +43,10 @@ std::ostream& operator<<(std::ostream& out, const Bounds& bounds) { return out; } +void Bounds::setException(const sys::ExceptionHolder& e) { + Waitable::ScopedLock l(lock); + lock.setException(e); + lock.waitWaiters(); // Wait for waiting threads to exit. +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Bounds.h b/cpp/src/qpid/client/Bounds.h index db18becce3..838fcb8368 100644 --- a/cpp/src/qpid/client/Bounds.h +++ b/cpp/src/qpid/client/Bounds.h @@ -20,7 +20,7 @@ * under the License. * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" namespace qpid{ namespace client{ @@ -32,10 +32,11 @@ class Bounds bool expand(size_t, bool block); void reduce(size_t); size_t getCurrentSize(); - + void setException(const sys::ExceptionHolder&); + private: friend std::ostream& operator<<(std::ostream&, const Bounds&); - sys::Monitor lock; + sys::Waitable lock; const size_t max; size_t current; }; diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index df1afb87a9..05f6bb9733 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -83,11 +83,10 @@ void ConnectionHandler::incoming(AMQFrame& frame) void ConnectionHandler::outgoing(AMQFrame& frame) { - if (getState() == OPEN) { + if (getState() == OPEN) out(frame); - } else { - throw Exception("Connection is not open."); - } + else + throw Exception(errorText.empty() ? "Connection is not open." : errorText); } void ConnectionHandler::waitForOpen() diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 81eda0bffb..22f10d3620 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -119,41 +119,32 @@ void ConnectionImpl::close() closed(NORMAL, "Closed by client"); } -// Set closed flags and erase the sessions map, but keep the contents -// so sessions can be updated outside the lock. -ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + +template <class F> void ConnectionImpl::closeInternal(const F& f) { isClosed = true; connector.close(); - SessionVector save; - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); - if (s) save.push_back(s); + if (s) f(s); } sessions.clear(); - return save; } -void ConnectionImpl::closed(uint16_t code, const std::string& text) -{ - SessionVector save; - { - Mutex::ScopedLock l(lock); - save = closeInternal(l); - } - std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text)); +void ConnectionImpl::closed(uint16_t code, const std::string& text) { + Mutex::ScopedLock l(lock); + setException(new ConnectionException(code, text)); + closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } static const std::string CONN_CLOSED("Connection closed by broker"); -void ConnectionImpl::shutdown() -{ +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); + // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here. + setException(new ConnectionException(CONNECTION_FORCED, CONN_CLOSED)); if (isClosed) return; - SessionVector save(closeInternal(l)); handler.fail(CONN_CLOSED); - Mutex::ScopedUnlock u(lock); - std::for_each(save.begin(), save.end(), - boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 655bca359b..089e73335d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -49,7 +49,6 @@ class ConnectionImpl : public Bounds, { typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap; - typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector; SessionMap sessions; ConnectionHandler handler; @@ -59,9 +58,8 @@ class ConnectionImpl : public Bounds, bool isClosed; bool isClosing; - template <class F> void detachAll(const F&); + template <class F> void closeInternal(const F&); - SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 66e1b9e40f..7b8cae943f 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -69,12 +69,14 @@ SessionImpl::SessionImpl(const std::string& name, } SessionImpl::~SessionImpl() { - Lock l(state); - if (state != DETACHED) { - QPID_LOG(warning, "Detaching deleted session"); - setState(DETACHED); - handleClosed(); - state.waitWaiters(); + { + Lock l(state); + if (state != DETACHED) { + QPID_LOG(warning, "Detaching deleted session"); + setState(DETACHED); + handleClosed(); + state.waitWaiters(); + } } connection->erase(channel); } |
