summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build/cmake/ConfigureChecks.cmake1
-rw-r--r--lib/cpp/src/thrift/concurrency/BoostMutex.cpp4
-rw-r--r--lib/cpp/src/thrift/concurrency/Mutex.cpp86
-rw-r--r--lib/cpp/src/thrift/concurrency/Mutex.h11
-rw-r--r--lib/cpp/src/thrift/concurrency/StdMutex.cpp2
-rw-r--r--lib/cpp/src/thrift/concurrency/ThreadManager.cpp4
-rw-r--r--lib/cpp/test/CMakeLists.txt3
-rwxr-xr-xlib/cpp/test/Makefile.am3
-rw-r--r--lib/cpp/test/concurrency/MutexTest.cpp123
-rw-r--r--lib/cpp/test/concurrency/RWMutexStarveTest.cpp (renamed from lib/cpp/test/RWMutexStarveTest.cpp)3
10 files changed, 190 insertions, 50 deletions
diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake
index 81223d84b..12a50df91 100644
--- a/build/cmake/ConfigureChecks.cmake
+++ b/build/cmake/ConfigureChecks.cmake
@@ -46,6 +46,7 @@ check_include_file(sys/un.h HAVE_SYS_UN_H)
check_include_file(sys/poll.h HAVE_SYS_POLL_H)
check_include_file(sys/select.h HAVE_SYS_SELECT_H)
check_include_file(sched.h HAVE_SCHED_H)
+check_include_file(string.h HAVE_STRING_H)
check_include_file(strings.h HAVE_STRINGS_H)
check_function_exists(gethostbyname HAVE_GETHOSTBYNAME)
diff --git a/lib/cpp/src/thrift/concurrency/BoostMutex.cpp b/lib/cpp/src/thrift/concurrency/BoostMutex.cpp
index f7cadabcc..4e556df17 100644
--- a/lib/cpp/src/thrift/concurrency/BoostMutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/BoostMutex.cpp
@@ -33,7 +33,9 @@ namespace thrift {
namespace concurrency {
/**
- * Implementation of Mutex class using boost interprocess mutex
+ * Implementation of Mutex class using boost::timed_mutex
+ *
+ * Methods throw boost::lock_error on error.
*
* @version $Id:$
*/
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp
index b6b915d64..bcab05ec3 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp
@@ -17,18 +17,24 @@
* under the License.
*/
+// needed to test for pthread implementation capabilities:
+#define __USE_GNU
+
#include <thrift/thrift-config.h>
#include <thrift/Thrift.h>
+#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Util.h>
#include <assert.h>
-#ifdef HAVE_PTHREAD_H
+#include <stdlib.h>
#include <pthread.h>
-#endif
#include <signal.h>
+#include <string.h>
+#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
using boost::shared_ptr;
namespace apache {
@@ -110,9 +116,17 @@ static inline int64_t maybeGetProfilingStartTime() {
#define PROFILE_MUTEX_UNLOCKED()
#endif // THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
+#define EINTR_LOOP(_CALL) int ret; do { ret = _CALL; } while (ret == EINTR)
+#define ABORT_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { abort(); } }
+#define THROW_SRE(_CALLSTR, RET) { throw SystemResourceException(boost::str(boost::format("%1% returned %2% (%3%)") % _CALLSTR % RET % ::strerror(RET))); }
+#define THROW_SRE_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { THROW_SRE(#_CALL, ret); } }
+#define THROW_SRE_TRYFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret == 0) { return true; } else if (ret == EBUSY) { return false; } THROW_SRE(#_CALL, ret); }
+
/**
* Implementation of Mutex class using POSIX mutex
*
+ * Throws apache::thrift::concurrency::SystemResourceException on error.
+ *
* @version $Id:$
*/
class Mutex::impl {
@@ -128,19 +142,19 @@ public:
~impl() {
if (initialized_) {
initialized_ = false;
- int ret = pthread_mutex_destroy(&pthread_mutex_);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ ABORT_ONFAIL(pthread_mutex_destroy(&pthread_mutex_));
}
}
void lock() const {
PROFILE_MUTEX_START_LOCK();
- pthread_mutex_lock(&pthread_mutex_);
+ THROW_SRE_ONFAIL(pthread_mutex_lock(&pthread_mutex_));
PROFILE_MUTEX_LOCKED();
}
- bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
+ bool trylock() const {
+ THROW_SRE_TRYFAIL(pthread_mutex_trylock(&pthread_mutex_));
+ }
bool timedlock(int64_t milliseconds) const {
#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
@@ -148,14 +162,16 @@ public:
struct THRIFT_TIMESPEC ts;
Util::toTimespec(ts, milliseconds + Util::currentTime());
- int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+ EINTR_LOOP(pthread_mutex_timedlock(&pthread_mutex_, &ts));
if (ret == 0) {
PROFILE_MUTEX_LOCKED();
return true;
+ } else if (ret == ETIMEDOUT) {
+ PROFILE_MUTEX_NOT_LOCKED();
+ return false;
}
- PROFILE_MUTEX_NOT_LOCKED();
- return false;
+ THROW_SRE("pthread_mutex_timedlock(&pthread_mutex_, &ts)", ret);
#else
/* Otherwise follow solution used by Mono for Android */
struct THRIFT_TIMESPEC sleepytime, now, to;
@@ -180,7 +196,7 @@ public:
void unlock() const {
PROFILE_MUTEX_START_UNLOCK();
- pthread_mutex_unlock(&pthread_mutex_);
+ THROW_SRE_ONFAIL(pthread_mutex_unlock(&pthread_mutex_));
PROFILE_MUTEX_UNLOCKED();
}
@@ -219,28 +235,16 @@ void Mutex::unlock() const {
void Mutex::DEFAULT_INITIALIZER(void* arg) {
pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
- int ret = pthread_mutex_init(pthread_mutex, NULL);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ THROW_SRE_ONFAIL(pthread_mutex_init(pthread_mutex, NULL));
}
-#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) \
- || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
+#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
static void init_with_kind(pthread_mutex_t* mutex, int kind) {
pthread_mutexattr_t mutexattr;
- int ret = pthread_mutexattr_init(&mutexattr);
- assert(ret == 0);
-
- // Apparently, this can fail. Should we really be aborting?
- ret = pthread_mutexattr_settype(&mutexattr, kind);
- assert(ret == 0);
-
- ret = pthread_mutex_init(mutex, &mutexattr);
- assert(ret == 0);
-
- ret = pthread_mutexattr_destroy(&mutexattr);
- assert(ret == 0);
- THRIFT_UNUSED_VARIABLE(ret);
+ THROW_SRE_ONFAIL(pthread_mutexattr_init(&mutexattr));
+ THROW_SRE_ONFAIL(pthread_mutexattr_settype(&mutexattr, kind));
+ THROW_SRE_ONFAIL(pthread_mutex_init(mutex, &mutexattr));
+ THROW_SRE_ONFAIL(pthread_mutexattr_destroy(&mutexattr));
}
#endif
@@ -258,6 +262,12 @@ void Mutex::ADAPTIVE_INITIALIZER(void* arg) {
}
#endif
+#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
+void Mutex::ERRORCHECK_INITIALIZER(void* arg) {
+ init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ERRORCHECK);
+}
+#endif
+
#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
void Mutex::RECURSIVE_INITIALIZER(void* arg) {
init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
@@ -275,40 +285,36 @@ public:
#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
profileTime_ = 0;
#endif
- int ret = pthread_rwlock_init(&rw_lock_, NULL);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ THROW_SRE_ONFAIL(pthread_rwlock_init(&rw_lock_, NULL));
initialized_ = true;
}
~impl() {
if (initialized_) {
initialized_ = false;
- int ret = pthread_rwlock_destroy(&rw_lock_);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ ABORT_ONFAIL(pthread_rwlock_destroy(&rw_lock_));
}
}
void acquireRead() const {
PROFILE_MUTEX_START_LOCK();
- pthread_rwlock_rdlock(&rw_lock_);
+ THROW_SRE_ONFAIL(pthread_rwlock_rdlock(&rw_lock_));
PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
}
void acquireWrite() const {
PROFILE_MUTEX_START_LOCK();
- pthread_rwlock_wrlock(&rw_lock_);
+ THROW_SRE_ONFAIL(pthread_rwlock_wrlock(&rw_lock_));
PROFILE_MUTEX_LOCKED();
}
- bool attemptRead() const { return !pthread_rwlock_tryrdlock(&rw_lock_); }
+ bool attemptRead() const { THROW_SRE_TRYFAIL(pthread_rwlock_tryrdlock(&rw_lock_)); }
- bool attemptWrite() const { return !pthread_rwlock_trywrlock(&rw_lock_); }
+ bool attemptWrite() const { THROW_SRE_TRYFAIL(pthread_rwlock_trywrlock(&rw_lock_)); }
void release() const {
PROFILE_MUTEX_START_UNLOCK();
- pthread_rwlock_unlock(&rw_lock_);
+ THROW_SRE_ONFAIL(pthread_rwlock_unlock(&rw_lock_));
PROFILE_MUTEX_UNLOCKED();
}
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h
index 6f892dcd3..e1e395edd 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.h
+++ b/lib/cpp/src/thrift/concurrency/Mutex.h
@@ -54,6 +54,11 @@ void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callbac
#endif
/**
+ * NOTE: All mutex implementations throw an exception on failure. See each
+ * specific implementation to understand the exception type(s) used.
+ */
+
+/**
* A simple mutex class
*
* @version $Id:$
@@ -64,6 +69,7 @@ public:
Mutex(Initializer init = DEFAULT_INITIALIZER);
virtual ~Mutex() {}
+
virtual void lock() const;
virtual bool trylock() const;
virtual bool timedlock(int64_t milliseconds) const;
@@ -71,8 +77,11 @@ public:
void* getUnderlyingImpl() const;
- static void DEFAULT_INITIALIZER(void*);
+ // If you attempt to use one of these and it fails to link, it means
+ // your version of pthreads does not support it - try another one.
static void ADAPTIVE_INITIALIZER(void*);
+ static void DEFAULT_INITIALIZER(void*);
+ static void ERRORCHECK_INITIALIZER(void*);
static void RECURSIVE_INITIALIZER(void*);
private:
diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
index 49c18d8d4..e0f79fa37 100644
--- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
@@ -33,6 +33,8 @@ namespace concurrency {
/**
* Implementation of Mutex class using C++11 std::timed_mutex
*
+ * Methods throw std::system_error on error.
+ *
* @version $Id:$
*/
class Mutex::impl : public std::timed_mutex {};
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index c4726dde3..88cd59a24 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -30,10 +30,6 @@
#include <deque>
#include <set>
-#if defined(DEBUG)
-#include <iostream>
-#endif // defined(DEBUG)
-
namespace apache {
namespace thrift {
namespace concurrency {
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index ef3d417a3..6d4aa5ecd 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -80,7 +80,8 @@ set(UnitTest_SOURCES
)
if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS AND NOT MSVC AND NOT MINGW)
- list(APPEND UnitTest_SOURCES RWMutexStarveTest.cpp)
+ list(APPEND UnitTest_SOURCES concurrency/MutexTest.cpp)
+ list(APPEND UnitTest_SOURCES concurrency/RWMutexStarveTest.cpp)
endif()
add_executable(UnitTests ${UnitTest_SOURCES})
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index d387297df..f61cff191 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -123,7 +123,8 @@ UnitTests_SOURCES = \
if !WITH_BOOSTTHREADS
UnitTests_SOURCES += \
- RWMutexStarveTest.cpp
+ concurrency/MutexTest.cpp \
+ concurrency/RWMutexStarveTest.cpp
endif
UnitTests_LDADD = \
diff --git a/lib/cpp/test/concurrency/MutexTest.cpp b/lib/cpp/test/concurrency/MutexTest.cpp
new file mode 100644
index 000000000..781ec1a40
--- /dev/null
+++ b/lib/cpp/test/concurrency/MutexTest.cpp
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// This is linked into the UnitTests test executable
+
+#include <boost/test/unit_test.hpp>
+
+#include "thrift/concurrency/Exception.h"
+#include "thrift/concurrency/Mutex.h"
+
+using boost::unit_test::test_suite;
+using boost::unit_test::framework::master_test_suite;
+
+using namespace apache::thrift::concurrency;
+
+struct LFAT
+{
+ LFAT()
+ : uut(Mutex::ERRORCHECK_INITIALIZER)
+ {
+ BOOST_CHECK_EQUAL(0, pthread_mutex_init(&mx, 0));
+ BOOST_CHECK_EQUAL(0, pthread_cond_init(&cv, 0));
+ }
+
+ Mutex uut;
+ pthread_mutex_t mx;
+ pthread_cond_t cv;
+};
+
+// Helper for testing mutex behavior when locked by another thread
+void * lockFromAnotherThread(void *ptr)
+{
+ struct LFAT *lfat = (LFAT *)ptr;
+ BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat->mx)); // synchronize with testing thread
+ BOOST_CHECK_NO_THROW( lfat->uut.lock());
+ BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat->cv)); // tell testing thread we have locked the mutex
+ BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat->cv, &lfat->mx)); // wait for testing thread to signal condition variable telling us to unlock
+ BOOST_CHECK_NO_THROW( lfat->uut.unlock());
+ return ptr; // testing thread should join to ensure completeness
+}
+
+BOOST_AUTO_TEST_SUITE(MutexTest)
+
+BOOST_AUTO_TEST_CASE(happy_path)
+{
+ Mutex uut(Mutex::ERRORCHECK_INITIALIZER); // needed to test unlocking twice without undefined behavior
+
+ BOOST_CHECK_NO_THROW( uut.lock());
+ BOOST_CHECK_THROW ( uut.lock(), SystemResourceException); // EDEADLK (this thread owns it)
+ BOOST_CHECK_NO_THROW( uut.unlock());
+}
+
+BOOST_AUTO_TEST_CASE(recursive_happy_path)
+{
+ Mutex uut(Mutex::RECURSIVE_INITIALIZER);
+
+ BOOST_CHECK_NO_THROW( uut.lock());
+ BOOST_CHECK_NO_THROW( uut.lock());
+ BOOST_CHECK_NO_THROW( uut.unlock());
+ BOOST_CHECK_NO_THROW( uut.lock());
+ BOOST_CHECK_NO_THROW( uut.lock());
+ BOOST_CHECK_NO_THROW( uut.unlock());
+ BOOST_CHECK_NO_THROW( uut.lock());
+ BOOST_CHECK_NO_THROW( uut.unlock());
+ BOOST_CHECK_NO_THROW( uut.unlock());
+ BOOST_CHECK_NO_THROW( uut.unlock());
+}
+
+BOOST_AUTO_TEST_CASE(trylock)
+{
+ Mutex uut(Mutex::ADAPTIVE_INITIALIZER); // just using another initializer for coverage
+
+ BOOST_CHECK ( uut.trylock());
+ BOOST_CHECK (!uut.trylock());
+ BOOST_CHECK_NO_THROW( uut.unlock());
+}
+
+BOOST_AUTO_TEST_CASE(timedlock)
+{
+ pthread_t th;
+ struct LFAT lfat;
+
+ BOOST_CHECK ( lfat.uut.timedlock(100));
+ BOOST_CHECK_THROW ( lfat.uut.timedlock(100),
+ SystemResourceException); // EDEADLK (current thread owns mutex - logic error)
+ BOOST_CHECK_NO_THROW( lfat.uut.unlock());
+
+ BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat.mx)); // synchronize with helper thread
+ BOOST_CHECK_EQUAL (0, pthread_create(&th, NULL,
+ lockFromAnotherThread, &lfat)); // create helper thread
+ BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat.cv, &lfat.mx)); // wait for helper thread to lock mutex
+
+ BOOST_CHECK (!lfat.uut.timedlock(100)); // false: another thread owns the lock
+
+ BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat.cv)); // tell helper thread we are done
+ BOOST_CHECK_EQUAL (0, pthread_mutex_unlock(&lfat.mx)); // let helper thread clean up
+ BOOST_CHECK_EQUAL (0, pthread_join(th, 0)); // wait for testing thread to unlock and be done
+}
+
+BOOST_AUTO_TEST_CASE(underlying)
+{
+ Mutex uut;
+
+ BOOST_CHECK ( uut.getUnderlyingImpl());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/RWMutexStarveTest.cpp b/lib/cpp/test/concurrency/RWMutexStarveTest.cpp
index 32c1531be..63d780fa1 100644
--- a/lib/cpp/test/RWMutexStarveTest.cpp
+++ b/lib/cpp/test/concurrency/RWMutexStarveTest.cpp
@@ -17,8 +17,7 @@
* under the License.
*/
-#include <iostream>
-#include <unistd.h>
+// This is linked into the UnitTests test executable
#include <boost/shared_ptr.hpp>
#include <boost/test/unit_test.hpp>