diff options
-rw-r--r-- | build/cmake/ConfigureChecks.cmake | 1 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/BoostMutex.cpp | 4 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/Mutex.cpp | 86 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/Mutex.h | 11 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/StdMutex.cpp | 2 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/ThreadManager.cpp | 4 | ||||
-rw-r--r-- | lib/cpp/test/CMakeLists.txt | 3 | ||||
-rwxr-xr-x | lib/cpp/test/Makefile.am | 3 | ||||
-rw-r--r-- | lib/cpp/test/concurrency/MutexTest.cpp | 123 | ||||
-rw-r--r-- | lib/cpp/test/concurrency/RWMutexStarveTest.cpp (renamed from lib/cpp/test/RWMutexStarveTest.cpp) | 3 |
10 files changed, 190 insertions, 50 deletions
diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake index 81223d84b..12a50df91 100644 --- a/build/cmake/ConfigureChecks.cmake +++ b/build/cmake/ConfigureChecks.cmake @@ -46,6 +46,7 @@ check_include_file(sys/un.h HAVE_SYS_UN_H) check_include_file(sys/poll.h HAVE_SYS_POLL_H) check_include_file(sys/select.h HAVE_SYS_SELECT_H) check_include_file(sched.h HAVE_SCHED_H) +check_include_file(string.h HAVE_STRING_H) check_include_file(strings.h HAVE_STRINGS_H) check_function_exists(gethostbyname HAVE_GETHOSTBYNAME) diff --git a/lib/cpp/src/thrift/concurrency/BoostMutex.cpp b/lib/cpp/src/thrift/concurrency/BoostMutex.cpp index f7cadabcc..4e556df17 100644 --- a/lib/cpp/src/thrift/concurrency/BoostMutex.cpp +++ b/lib/cpp/src/thrift/concurrency/BoostMutex.cpp @@ -33,7 +33,9 @@ namespace thrift { namespace concurrency { /** - * Implementation of Mutex class using boost interprocess mutex + * Implementation of Mutex class using boost::timed_mutex + * + * Methods throw boost::lock_error on error. * * @version $Id:$ */ diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp index b6b915d64..bcab05ec3 100644 --- a/lib/cpp/src/thrift/concurrency/Mutex.cpp +++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp @@ -17,18 +17,24 @@ * under the License. */ +// needed to test for pthread implementation capabilities: +#define __USE_GNU + #include <thrift/thrift-config.h> #include <thrift/Thrift.h> +#include <thrift/concurrency/Exception.h> #include <thrift/concurrency/Mutex.h> #include <thrift/concurrency/Util.h> #include <assert.h> -#ifdef HAVE_PTHREAD_H +#include <stdlib.h> #include <pthread.h> -#endif #include <signal.h> +#include <string.h> +#include <boost/format.hpp> +#include <boost/shared_ptr.hpp> using boost::shared_ptr; namespace apache { @@ -110,9 +116,17 @@ static inline int64_t maybeGetProfilingStartTime() { #define PROFILE_MUTEX_UNLOCKED() #endif // THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING +#define EINTR_LOOP(_CALL) int ret; do { ret = _CALL; } while (ret == EINTR) +#define ABORT_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { abort(); } } +#define THROW_SRE(_CALLSTR, RET) { throw SystemResourceException(boost::str(boost::format("%1% returned %2% (%3%)") % _CALLSTR % RET % ::strerror(RET))); } +#define THROW_SRE_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { THROW_SRE(#_CALL, ret); } } +#define THROW_SRE_TRYFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret == 0) { return true; } else if (ret == EBUSY) { return false; } THROW_SRE(#_CALL, ret); } + /** * Implementation of Mutex class using POSIX mutex * + * Throws apache::thrift::concurrency::SystemResourceException on error. + * * @version $Id:$ */ class Mutex::impl { @@ -128,19 +142,19 @@ public: ~impl() { if (initialized_) { initialized_ = false; - int ret = pthread_mutex_destroy(&pthread_mutex_); - THRIFT_UNUSED_VARIABLE(ret); - assert(ret == 0); + ABORT_ONFAIL(pthread_mutex_destroy(&pthread_mutex_)); } } void lock() const { PROFILE_MUTEX_START_LOCK(); - pthread_mutex_lock(&pthread_mutex_); + THROW_SRE_ONFAIL(pthread_mutex_lock(&pthread_mutex_)); PROFILE_MUTEX_LOCKED(); } - bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); } + bool trylock() const { + THROW_SRE_TRYFAIL(pthread_mutex_trylock(&pthread_mutex_)); + } bool timedlock(int64_t milliseconds) const { #if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L @@ -148,14 +162,16 @@ public: struct THRIFT_TIMESPEC ts; Util::toTimespec(ts, milliseconds + Util::currentTime()); - int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts); + EINTR_LOOP(pthread_mutex_timedlock(&pthread_mutex_, &ts)); if (ret == 0) { PROFILE_MUTEX_LOCKED(); return true; + } else if (ret == ETIMEDOUT) { + PROFILE_MUTEX_NOT_LOCKED(); + return false; } - PROFILE_MUTEX_NOT_LOCKED(); - return false; + THROW_SRE("pthread_mutex_timedlock(&pthread_mutex_, &ts)", ret); #else /* Otherwise follow solution used by Mono for Android */ struct THRIFT_TIMESPEC sleepytime, now, to; @@ -180,7 +196,7 @@ public: void unlock() const { PROFILE_MUTEX_START_UNLOCK(); - pthread_mutex_unlock(&pthread_mutex_); + THROW_SRE_ONFAIL(pthread_mutex_unlock(&pthread_mutex_)); PROFILE_MUTEX_UNLOCKED(); } @@ -219,28 +235,16 @@ void Mutex::unlock() const { void Mutex::DEFAULT_INITIALIZER(void* arg) { pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg; - int ret = pthread_mutex_init(pthread_mutex, NULL); - THRIFT_UNUSED_VARIABLE(ret); - assert(ret == 0); + THROW_SRE_ONFAIL(pthread_mutex_init(pthread_mutex, NULL)); } -#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) \ - || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP) +#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP) static void init_with_kind(pthread_mutex_t* mutex, int kind) { pthread_mutexattr_t mutexattr; - int ret = pthread_mutexattr_init(&mutexattr); - assert(ret == 0); - - // Apparently, this can fail. Should we really be aborting? - ret = pthread_mutexattr_settype(&mutexattr, kind); - assert(ret == 0); - - ret = pthread_mutex_init(mutex, &mutexattr); - assert(ret == 0); - - ret = pthread_mutexattr_destroy(&mutexattr); - assert(ret == 0); - THRIFT_UNUSED_VARIABLE(ret); + THROW_SRE_ONFAIL(pthread_mutexattr_init(&mutexattr)); + THROW_SRE_ONFAIL(pthread_mutexattr_settype(&mutexattr, kind)); + THROW_SRE_ONFAIL(pthread_mutex_init(mutex, &mutexattr)); + THROW_SRE_ONFAIL(pthread_mutexattr_destroy(&mutexattr)); } #endif @@ -258,6 +262,12 @@ void Mutex::ADAPTIVE_INITIALIZER(void* arg) { } #endif +#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP +void Mutex::ERRORCHECK_INITIALIZER(void* arg) { + init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ERRORCHECK); +} +#endif + #ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP void Mutex::RECURSIVE_INITIALIZER(void* arg) { init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP); @@ -275,40 +285,36 @@ public: #ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING profileTime_ = 0; #endif - int ret = pthread_rwlock_init(&rw_lock_, NULL); - THRIFT_UNUSED_VARIABLE(ret); - assert(ret == 0); + THROW_SRE_ONFAIL(pthread_rwlock_init(&rw_lock_, NULL)); initialized_ = true; } ~impl() { if (initialized_) { initialized_ = false; - int ret = pthread_rwlock_destroy(&rw_lock_); - THRIFT_UNUSED_VARIABLE(ret); - assert(ret == 0); + ABORT_ONFAIL(pthread_rwlock_destroy(&rw_lock_)); } } void acquireRead() const { PROFILE_MUTEX_START_LOCK(); - pthread_rwlock_rdlock(&rw_lock_); + THROW_SRE_ONFAIL(pthread_rwlock_rdlock(&rw_lock_)); PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path } void acquireWrite() const { PROFILE_MUTEX_START_LOCK(); - pthread_rwlock_wrlock(&rw_lock_); + THROW_SRE_ONFAIL(pthread_rwlock_wrlock(&rw_lock_)); PROFILE_MUTEX_LOCKED(); } - bool attemptRead() const { return !pthread_rwlock_tryrdlock(&rw_lock_); } + bool attemptRead() const { THROW_SRE_TRYFAIL(pthread_rwlock_tryrdlock(&rw_lock_)); } - bool attemptWrite() const { return !pthread_rwlock_trywrlock(&rw_lock_); } + bool attemptWrite() const { THROW_SRE_TRYFAIL(pthread_rwlock_trywrlock(&rw_lock_)); } void release() const { PROFILE_MUTEX_START_UNLOCK(); - pthread_rwlock_unlock(&rw_lock_); + THROW_SRE_ONFAIL(pthread_rwlock_unlock(&rw_lock_)); PROFILE_MUTEX_UNLOCKED(); } diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h index 6f892dcd3..e1e395edd 100644 --- a/lib/cpp/src/thrift/concurrency/Mutex.h +++ b/lib/cpp/src/thrift/concurrency/Mutex.h @@ -54,6 +54,11 @@ void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callbac #endif /** + * NOTE: All mutex implementations throw an exception on failure. See each + * specific implementation to understand the exception type(s) used. + */ + +/** * A simple mutex class * * @version $Id:$ @@ -64,6 +69,7 @@ public: Mutex(Initializer init = DEFAULT_INITIALIZER); virtual ~Mutex() {} + virtual void lock() const; virtual bool trylock() const; virtual bool timedlock(int64_t milliseconds) const; @@ -71,8 +77,11 @@ public: void* getUnderlyingImpl() const; - static void DEFAULT_INITIALIZER(void*); + // If you attempt to use one of these and it fails to link, it means + // your version of pthreads does not support it - try another one. static void ADAPTIVE_INITIALIZER(void*); + static void DEFAULT_INITIALIZER(void*); + static void ERRORCHECK_INITIALIZER(void*); static void RECURSIVE_INITIALIZER(void*); private: diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp index 49c18d8d4..e0f79fa37 100644 --- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp +++ b/lib/cpp/src/thrift/concurrency/StdMutex.cpp @@ -33,6 +33,8 @@ namespace concurrency { /** * Implementation of Mutex class using C++11 std::timed_mutex * + * Methods throw std::system_error on error. + * * @version $Id:$ */ class Mutex::impl : public std::timed_mutex {}; diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp index c4726dde3..88cd59a24 100644 --- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp @@ -30,10 +30,6 @@ #include <deque> #include <set> -#if defined(DEBUG) -#include <iostream> -#endif // defined(DEBUG) - namespace apache { namespace thrift { namespace concurrency { diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt index ef3d417a3..6d4aa5ecd 100644 --- a/lib/cpp/test/CMakeLists.txt +++ b/lib/cpp/test/CMakeLists.txt @@ -80,7 +80,8 @@ set(UnitTest_SOURCES ) if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS AND NOT MSVC AND NOT MINGW) - list(APPEND UnitTest_SOURCES RWMutexStarveTest.cpp) + list(APPEND UnitTest_SOURCES concurrency/MutexTest.cpp) + list(APPEND UnitTest_SOURCES concurrency/RWMutexStarveTest.cpp) endif() add_executable(UnitTests ${UnitTest_SOURCES}) diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index d387297df..f61cff191 100755 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -123,7 +123,8 @@ UnitTests_SOURCES = \ if !WITH_BOOSTTHREADS UnitTests_SOURCES += \ - RWMutexStarveTest.cpp + concurrency/MutexTest.cpp \ + concurrency/RWMutexStarveTest.cpp endif UnitTests_LDADD = \ diff --git a/lib/cpp/test/concurrency/MutexTest.cpp b/lib/cpp/test/concurrency/MutexTest.cpp new file mode 100644 index 000000000..781ec1a40 --- /dev/null +++ b/lib/cpp/test/concurrency/MutexTest.cpp @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// This is linked into the UnitTests test executable + +#include <boost/test/unit_test.hpp> + +#include "thrift/concurrency/Exception.h" +#include "thrift/concurrency/Mutex.h" + +using boost::unit_test::test_suite; +using boost::unit_test::framework::master_test_suite; + +using namespace apache::thrift::concurrency; + +struct LFAT +{ + LFAT() + : uut(Mutex::ERRORCHECK_INITIALIZER) + { + BOOST_CHECK_EQUAL(0, pthread_mutex_init(&mx, 0)); + BOOST_CHECK_EQUAL(0, pthread_cond_init(&cv, 0)); + } + + Mutex uut; + pthread_mutex_t mx; + pthread_cond_t cv; +}; + +// Helper for testing mutex behavior when locked by another thread +void * lockFromAnotherThread(void *ptr) +{ + struct LFAT *lfat = (LFAT *)ptr; + BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat->mx)); // synchronize with testing thread + BOOST_CHECK_NO_THROW( lfat->uut.lock()); + BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat->cv)); // tell testing thread we have locked the mutex + BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat->cv, &lfat->mx)); // wait for testing thread to signal condition variable telling us to unlock + BOOST_CHECK_NO_THROW( lfat->uut.unlock()); + return ptr; // testing thread should join to ensure completeness +} + +BOOST_AUTO_TEST_SUITE(MutexTest) + +BOOST_AUTO_TEST_CASE(happy_path) +{ + Mutex uut(Mutex::ERRORCHECK_INITIALIZER); // needed to test unlocking twice without undefined behavior + + BOOST_CHECK_NO_THROW( uut.lock()); + BOOST_CHECK_THROW ( uut.lock(), SystemResourceException); // EDEADLK (this thread owns it) + BOOST_CHECK_NO_THROW( uut.unlock()); +} + +BOOST_AUTO_TEST_CASE(recursive_happy_path) +{ + Mutex uut(Mutex::RECURSIVE_INITIALIZER); + + BOOST_CHECK_NO_THROW( uut.lock()); + BOOST_CHECK_NO_THROW( uut.lock()); + BOOST_CHECK_NO_THROW( uut.unlock()); + BOOST_CHECK_NO_THROW( uut.lock()); + BOOST_CHECK_NO_THROW( uut.lock()); + BOOST_CHECK_NO_THROW( uut.unlock()); + BOOST_CHECK_NO_THROW( uut.lock()); + BOOST_CHECK_NO_THROW( uut.unlock()); + BOOST_CHECK_NO_THROW( uut.unlock()); + BOOST_CHECK_NO_THROW( uut.unlock()); +} + +BOOST_AUTO_TEST_CASE(trylock) +{ + Mutex uut(Mutex::ADAPTIVE_INITIALIZER); // just using another initializer for coverage + + BOOST_CHECK ( uut.trylock()); + BOOST_CHECK (!uut.trylock()); + BOOST_CHECK_NO_THROW( uut.unlock()); +} + +BOOST_AUTO_TEST_CASE(timedlock) +{ + pthread_t th; + struct LFAT lfat; + + BOOST_CHECK ( lfat.uut.timedlock(100)); + BOOST_CHECK_THROW ( lfat.uut.timedlock(100), + SystemResourceException); // EDEADLK (current thread owns mutex - logic error) + BOOST_CHECK_NO_THROW( lfat.uut.unlock()); + + BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat.mx)); // synchronize with helper thread + BOOST_CHECK_EQUAL (0, pthread_create(&th, NULL, + lockFromAnotherThread, &lfat)); // create helper thread + BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat.cv, &lfat.mx)); // wait for helper thread to lock mutex + + BOOST_CHECK (!lfat.uut.timedlock(100)); // false: another thread owns the lock + + BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat.cv)); // tell helper thread we are done + BOOST_CHECK_EQUAL (0, pthread_mutex_unlock(&lfat.mx)); // let helper thread clean up + BOOST_CHECK_EQUAL (0, pthread_join(th, 0)); // wait for testing thread to unlock and be done +} + +BOOST_AUTO_TEST_CASE(underlying) +{ + Mutex uut; + + BOOST_CHECK ( uut.getUnderlyingImpl()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/cpp/test/RWMutexStarveTest.cpp b/lib/cpp/test/concurrency/RWMutexStarveTest.cpp index 32c1531be..63d780fa1 100644 --- a/lib/cpp/test/RWMutexStarveTest.cpp +++ b/lib/cpp/test/concurrency/RWMutexStarveTest.cpp @@ -17,8 +17,7 @@ * under the License. */ -#include <iostream> -#include <unistd.h> +// This is linked into the UnitTests test executable #include <boost/shared_ptr.hpp> #include <boost/test/unit_test.hpp> |