summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-06-06 20:23:28 +0000
committerAlan Conway <aconway@apache.org>2008-06-06 20:23:28 +0000
commitfb1f5c770c551fe526adf5b860dd72cf5eb07311 (patch)
tree79a0d3ccb278e51b9ec5213b038b903d768c2727 /cpp/src/qpid/client
parent76c922baf182bb367feed2ec014e7cab9db7f79d (diff)
downloadqpid-python-fb1f5c770c551fe526adf5b860dd72cf5eb07311.tar.gz
Added exceptions to sys::Waitable.
Fixed client side deadlock involving client::Bounds. Fixed incorrect exception messages during connection shutdown. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@664114 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Bounds.cpp51
-rw-r--r--cpp/src/qpid/client/Bounds.h7
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp7
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp33
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h4
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp14
6 files changed, 52 insertions, 64 deletions
diff --git a/cpp/src/qpid/client/Bounds.cpp b/cpp/src/qpid/client/Bounds.cpp
index 1df21db941..aac18022bc 100644
--- a/cpp/src/qpid/client/Bounds.cpp
+++ b/cpp/src/qpid/client/Bounds.cpp
@@ -1,49 +1,40 @@
#include "Bounds.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Waitable.h"
namespace qpid {
namespace client {
-using sys::Monitor;
+using sys::Waitable;
Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {}
-bool Bounds::expand(size_t sizeRequired, bool block)
-{
- if (max) {
- Monitor::ScopedLock l(lock);
- current += sizeRequired;
- if (block) {
- while (current > max) {
- QPID_LOG(debug, "Waiting for bounds: " << *this);
- lock.wait();
- }
- QPID_LOG(debug, "Bounds ok: " << *this);
- }
- return current <= max;
- } else {
- return true;
+bool Bounds::expand(size_t sizeRequired, bool block) {
+ if (!max) return true;
+ Waitable::ScopedLock l(lock);
+ current += sizeRequired;
+ if (block) {
+ Waitable::ScopedWait w(lock);
+ while (current > max)
+ lock.wait();
}
+ return current <= max;
}
-void Bounds::reduce(size_t size)
-{
+void Bounds::reduce(size_t size) {
if (!max || size == 0) return;
- Monitor::ScopedLock l(lock);
+ Waitable::ScopedLock l(lock);
if (current == 0) return;
- bool needNotify = current > max;
current -= std::min(size, current);
- if (needNotify && current < max) {
- //todo: notify one at a time, but ensure that all threads are
- //eventually notified
- lock.notifyAll();
+ if (current < max && lock.hasWaiters()) {
+ assert(lock.hasWaiters() == 1);
+ lock.notify();
}
}
-size_t Bounds::getCurrentSize()
-{
- Monitor::ScopedLock l(lock);
+size_t Bounds::getCurrentSize() {
+ Waitable::ScopedLock l(lock);
return current;
}
@@ -52,4 +43,10 @@ std::ostream& operator<<(std::ostream& out, const Bounds& bounds) {
return out;
}
+void Bounds::setException(const sys::ExceptionHolder& e) {
+ Waitable::ScopedLock l(lock);
+ lock.setException(e);
+ lock.waitWaiters(); // Wait for waiting threads to exit.
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Bounds.h b/cpp/src/qpid/client/Bounds.h
index db18becce3..838fcb8368 100644
--- a/cpp/src/qpid/client/Bounds.h
+++ b/cpp/src/qpid/client/Bounds.h
@@ -20,7 +20,7 @@
* under the License.
*
*/
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
namespace qpid{
namespace client{
@@ -32,10 +32,11 @@ class Bounds
bool expand(size_t, bool block);
void reduce(size_t);
size_t getCurrentSize();
-
+ void setException(const sys::ExceptionHolder&);
+
private:
friend std::ostream& operator<<(std::ostream&, const Bounds&);
- sys::Monitor lock;
+ sys::Waitable lock;
const size_t max;
size_t current;
};
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index df1afb87a9..05f6bb9733 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -83,11 +83,10 @@ void ConnectionHandler::incoming(AMQFrame& frame)
void ConnectionHandler::outgoing(AMQFrame& frame)
{
- if (getState() == OPEN) {
+ if (getState() == OPEN)
out(frame);
- } else {
- throw Exception("Connection is not open.");
- }
+ else
+ throw Exception(errorText.empty() ? "Connection is not open." : errorText);
}
void ConnectionHandler::waitForOpen()
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 81eda0bffb..22f10d3620 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -119,41 +119,32 @@ void ConnectionImpl::close()
closed(NORMAL, "Closed by client");
}
-// Set closed flags and erase the sessions map, but keep the contents
-// so sessions can be updated outside the lock.
-ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
+
+template <class F> void ConnectionImpl::closeInternal(const F& f) {
isClosed = true;
connector.close();
- SessionVector save;
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
- if (s) save.push_back(s);
+ if (s) f(s);
}
sessions.clear();
- return save;
}
-void ConnectionImpl::closed(uint16_t code, const std::string& text)
-{
- SessionVector save;
- {
- Mutex::ScopedLock l(lock);
- save = closeInternal(l);
- }
- std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text));
+void ConnectionImpl::closed(uint16_t code, const std::string& text) {
+ Mutex::ScopedLock l(lock);
+ setException(new ConnectionException(code, text));
+ closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
static const std::string CONN_CLOSED("Connection closed by broker");
-void ConnectionImpl::shutdown()
-{
+void ConnectionImpl::shutdown() {
Mutex::ScopedLock l(lock);
+ // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here.
+ setException(new ConnectionException(CONNECTION_FORCED, CONN_CLOSED));
if (isClosed) return;
- SessionVector save(closeInternal(l));
handler.fail(CONN_CLOSED);
- Mutex::ScopedUnlock u(lock);
- std::for_each(save.begin(), save.end(),
- boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
+ closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 655bca359b..089e73335d 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -49,7 +49,6 @@ class ConnectionImpl : public Bounds,
{
typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
- typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector;
SessionMap sessions;
ConnectionHandler handler;
@@ -59,9 +58,8 @@ class ConnectionImpl : public Bounds,
bool isClosed;
bool isClosing;
- template <class F> void detachAll(const F&);
+ template <class F> void closeInternal(const F&);
- SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
void idleOut();
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 66e1b9e40f..7b8cae943f 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -69,12 +69,14 @@ SessionImpl::SessionImpl(const std::string& name,
}
SessionImpl::~SessionImpl() {
- Lock l(state);
- if (state != DETACHED) {
- QPID_LOG(warning, "Detaching deleted session");
- setState(DETACHED);
- handleClosed();
- state.waitWaiters();
+ {
+ Lock l(state);
+ if (state != DETACHED) {
+ QPID_LOG(warning, "Detaching deleted session");
+ setState(DETACHED);
+ handleClosed();
+ state.waitWaiters();
+ }
}
connection->erase(channel);
}