summaryrefslogtreecommitdiff
path: root/lib/cpp/src/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cpp/src/concurrency')
-rw-r--r--lib/cpp/src/concurrency/Exception.h60
-rw-r--r--lib/cpp/src/concurrency/FunctionRunner.h77
-rw-r--r--lib/cpp/src/concurrency/Monitor.cpp137
-rw-r--r--lib/cpp/src/concurrency/Monitor.h84
-rw-r--r--lib/cpp/src/concurrency/Mutex.cpp160
-rw-r--r--lib/cpp/src/concurrency/Mutex.h112
-rw-r--r--lib/cpp/src/concurrency/PosixThreadFactory.cpp308
-rw-r--r--lib/cpp/src/concurrency/PosixThreadFactory.h130
-rw-r--r--lib/cpp/src/concurrency/Thread.h125
-rw-r--r--lib/cpp/src/concurrency/ThreadManager.cpp493
-rw-r--r--lib/cpp/src/concurrency/ThreadManager.h169
-rw-r--r--lib/cpp/src/concurrency/TimerManager.cpp284
-rw-r--r--lib/cpp/src/concurrency/TimerManager.h120
-rw-r--r--lib/cpp/src/concurrency/Util.cpp55
-rw-r--r--lib/cpp/src/concurrency/Util.h100
-rw-r--r--lib/cpp/src/concurrency/test/Tests.cpp155
-rw-r--r--lib/cpp/src/concurrency/test/ThreadFactoryTests.h357
-rw-r--r--lib/cpp/src/concurrency/test/ThreadManagerTests.h366
-rw-r--r--lib/cpp/src/concurrency/test/TimerManagerTests.h142
19 files changed, 3434 insertions, 0 deletions
diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h
new file mode 100644
index 000000000..ec4662976
--- /dev/null
+++ b/lib/cpp/src/concurrency/Exception.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
+#define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1
+
+#include <exception>
+#include <Thrift.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+class NoSuchTaskException : public apache::thrift::TException {};
+
+class UncancellableTaskException : public apache::thrift::TException {};
+
+class InvalidArgumentException : public apache::thrift::TException {};
+
+class IllegalStateException : public apache::thrift::TException {};
+
+class TimedOutException : public apache::thrift::TException {
+public:
+ TimedOutException():TException("TimedOutException"){};
+ TimedOutException(const std::string& message ) :
+ TException(message) {}
+};
+
+class TooManyPendingTasksException : public apache::thrift::TException {
+public:
+ TooManyPendingTasksException():TException("TooManyPendingTasksException"){};
+ TooManyPendingTasksException(const std::string& message ) :
+ TException(message) {}
+};
+
+class SystemResourceException : public apache::thrift::TException {
+public:
+ SystemResourceException() {}
+
+ SystemResourceException(const std::string& message) :
+ TException(message) {}
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
diff --git a/lib/cpp/src/concurrency/FunctionRunner.h b/lib/cpp/src/concurrency/FunctionRunner.h
new file mode 100644
index 000000000..221692767
--- /dev/null
+++ b/lib/cpp/src/concurrency/FunctionRunner.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
+#define _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H 1
+
+#include <tr1/functional>
+#include "thrift/lib/cpp/concurrency/Thread.h"
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Convenient implementation of Runnable that will execute arbitrary callbacks.
+ * Interfaces are provided to accept both a generic 'void(void)' callback, and
+ * a 'void* (void*)' pthread_create-style callback.
+ *
+ * Example use:
+ * void* my_thread_main(void* arg);
+ * shared_ptr<ThreadFactory> factory = ...;
+ * shared_ptr<Thread> thread =
+ * factory->newThread(shared_ptr<FunctionRunner>(
+ * new FunctionRunner(my_thread_main, some_argument)));
+ * thread->start();
+ *
+ *
+ */
+
+class FunctionRunner : public Runnable {
+ public:
+ // This is the type of callback 'pthread_create()' expects.
+ typedef void* (*PthreadFuncPtr)(void *arg);
+ // This a fully-generic void(void) callback for custom bindings.
+ typedef std::tr1::function<void()> VoidFunc;
+
+ /**
+ * Given a 'pthread_create' style callback, this FunctionRunner will
+ * execute the given callback. Note that the 'void*' return value is ignored.
+ */
+ FunctionRunner(PthreadFuncPtr func, void* arg)
+ : func_(std::tr1::bind(func, arg))
+ { }
+
+ /**
+ * Given a generic callback, this FunctionRunner will execute it.
+ */
+ FunctionRunner(const VoidFunc& cob)
+ : func_(cob)
+ { }
+
+
+ void run() {
+ func_();
+ }
+
+ private:
+ VoidFunc func_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
diff --git a/lib/cpp/src/concurrency/Monitor.cpp b/lib/cpp/src/concurrency/Monitor.cpp
new file mode 100644
index 000000000..2055caa95
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.cpp
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Monitor.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <errno.h>
+
+#include <iostream>
+
+#include <pthread.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Monitor implementation using the POSIX pthread library
+ *
+ * @version $Id:$
+ */
+class Monitor::Impl {
+
+ public:
+
+ Impl() :
+ mutexInitialized_(false),
+ condInitialized_(false) {
+
+ if (pthread_mutex_init(&pthread_mutex_, NULL) == 0) {
+ mutexInitialized_ = true;
+
+ if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
+ condInitialized_ = true;
+ }
+ }
+
+ if (!mutexInitialized_ || !condInitialized_) {
+ cleanup();
+ throw SystemResourceException();
+ }
+ }
+
+ ~Impl() { cleanup(); }
+
+ void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+
+ void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+
+ void wait(int64_t timeout) const {
+
+ // XXX Need to assert that caller owns mutex
+ assert(timeout >= 0LL);
+ if (timeout == 0LL) {
+ int iret = pthread_cond_wait(&pthread_cond_, &pthread_mutex_);
+ assert(iret == 0);
+ } else {
+ struct timespec abstime;
+ int64_t now = Util::currentTime();
+ Util::toTimespec(abstime, now + timeout);
+ int result = pthread_cond_timedwait(&pthread_cond_,
+ &pthread_mutex_,
+ &abstime);
+ if (result == ETIMEDOUT) {
+ // pthread_cond_timedwait has been observed to return early on
+ // various platforms, so comment out this assert.
+ //assert(Util::currentTime() >= (now + timeout));
+ throw TimedOutException();
+ }
+ }
+ }
+
+ void notify() {
+ // XXX Need to assert that caller owns mutex
+ int iret = pthread_cond_signal(&pthread_cond_);
+ assert(iret == 0);
+ }
+
+ void notifyAll() {
+ // XXX Need to assert that caller owns mutex
+ int iret = pthread_cond_broadcast(&pthread_cond_);
+ assert(iret == 0);
+ }
+
+ private:
+
+ void cleanup() {
+ if (mutexInitialized_) {
+ mutexInitialized_ = false;
+ int iret = pthread_mutex_destroy(&pthread_mutex_);
+ assert(iret == 0);
+ }
+
+ if (condInitialized_) {
+ condInitialized_ = false;
+ int iret = pthread_cond_destroy(&pthread_cond_);
+ assert(iret == 0);
+ }
+ }
+
+ mutable pthread_mutex_t pthread_mutex_;
+ mutable bool mutexInitialized_;
+ mutable pthread_cond_t pthread_cond_;
+ mutable bool condInitialized_;
+};
+
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+
+Monitor::~Monitor() { delete impl_; }
+
+void Monitor::lock() const { impl_->lock(); }
+
+void Monitor::unlock() const { impl_->unlock(); }
+
+void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); }
+
+void Monitor::notify() const { impl_->notify(); }
+
+void Monitor::notifyAll() const { impl_->notifyAll(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
new file mode 100644
index 000000000..234bf3269
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
+#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
+
+#include "Exception.h"
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A monitor is a combination mutex and condition-event. Waiting and
+ * notifying condition events requires that the caller own the mutex. Mutex
+ * lock and unlock operations can be performed independently of condition
+ * events. This is more or less analogous to java.lang.Object multi-thread
+ * operations
+ *
+ * Note that all methods are const. Monitors implement logical constness, not
+ * bit constness. This allows const methods to call monitor methods without
+ * needing to cast away constness or change to non-const signatures.
+ *
+ * @version $Id:$
+ */
+class Monitor {
+
+ public:
+
+ Monitor();
+
+ virtual ~Monitor();
+
+ virtual void lock() const;
+
+ virtual void unlock() const;
+
+ virtual void wait(int64_t timeout=0LL) const;
+
+ virtual void notify() const;
+
+ virtual void notifyAll() const;
+
+ private:
+
+ class Impl;
+
+ Impl* impl_;
+};
+
+class Synchronized {
+ public:
+
+ Synchronized(const Monitor& value) :
+ monitor_(value) {
+ monitor_.lock();
+ }
+
+ ~Synchronized() {
+ monitor_.unlock();
+ }
+
+ private:
+ const Monitor& monitor_;
+};
+
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
new file mode 100644
index 000000000..045dbdfe2
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Mutex.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+using boost::shared_ptr;
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Implementation of Mutex class using POSIX mutex
+ *
+ * @version $Id:$
+ */
+class Mutex::impl {
+ public:
+ impl(Initializer init) : initialized_(false) {
+ init(&pthread_mutex_);
+ initialized_ = true;
+ }
+
+ ~impl() {
+ if (initialized_) {
+ initialized_ = false;
+ int ret = pthread_mutex_destroy(&pthread_mutex_);
+ assert(ret == 0);
+ }
+ }
+
+ void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+
+ bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
+
+ void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+
+ private:
+ mutable pthread_mutex_t pthread_mutex_;
+ mutable bool initialized_;
+};
+
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
+
+void Mutex::lock() const { impl_->lock(); }
+
+bool Mutex::trylock() const { return impl_->trylock(); }
+
+void Mutex::unlock() const { impl_->unlock(); }
+
+void Mutex::DEFAULT_INITIALIZER(void* arg) {
+ pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
+ int ret = pthread_mutex_init(pthread_mutex, NULL);
+ assert(ret == 0);
+}
+
+static void init_with_kind(pthread_mutex_t* mutex, int kind) {
+ pthread_mutexattr_t mutexattr;
+ int ret = pthread_mutexattr_init(&mutexattr);
+ assert(ret == 0);
+
+ // Apparently, this can fail. Should we really be aborting?
+ ret = pthread_mutexattr_settype(&mutexattr, kind);
+ assert(ret == 0);
+
+ ret = pthread_mutex_init(mutex, &mutexattr);
+ assert(ret == 0);
+
+ ret = pthread_mutexattr_destroy(&mutexattr);
+ assert(ret == 0);
+}
+
+#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
+void Mutex::ADAPTIVE_INITIALIZER(void* arg) {
+ // From mysql source: mysys/my_thr_init.c
+ // Set mutex type to "fast" a.k.a "adaptive"
+ //
+ // In this case the thread may steal the mutex from some other thread
+ // that is waiting for the same mutex. This will save us some
+ // context switches but may cause a thread to 'starve forever' while
+ // waiting for the mutex (not likely if the code within the mutex is
+ // short).
+ init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP);
+}
+#endif
+
+#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
+void Mutex::RECURSIVE_INITIALIZER(void* arg) {
+ init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
+}
+#endif
+
+
+/**
+ * Implementation of ReadWriteMutex class using POSIX rw lock
+ *
+ * @version $Id:$
+ */
+class ReadWriteMutex::impl {
+public:
+ impl() : initialized_(false) {
+ int ret = pthread_rwlock_init(&rw_lock_, NULL);
+ assert(ret == 0);
+ initialized_ = true;
+ }
+
+ ~impl() {
+ if(initialized_) {
+ initialized_ = false;
+ int ret = pthread_rwlock_destroy(&rw_lock_);
+ assert(ret == 0);
+ }
+ }
+
+ void acquireRead() const { pthread_rwlock_rdlock(&rw_lock_); }
+
+ void acquireWrite() const { pthread_rwlock_wrlock(&rw_lock_); }
+
+ bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); }
+
+ bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); }
+
+ void release() const { pthread_rwlock_unlock(&rw_lock_); }
+
+private:
+ mutable pthread_rwlock_t rw_lock_;
+ mutable bool initialized_;
+};
+
+ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}
+
+void ReadWriteMutex::acquireRead() const { impl_->acquireRead(); }
+
+void ReadWriteMutex::acquireWrite() const { impl_->acquireWrite(); }
+
+bool ReadWriteMutex::attemptRead() const { return impl_->attemptRead(); }
+
+bool ReadWriteMutex::attemptWrite() const { return impl_->attemptWrite(); }
+
+void ReadWriteMutex::release() const { impl_->release(); }
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
new file mode 100644
index 000000000..884412bea
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_MUTEX_H_
+#define _THRIFT_CONCURRENCY_MUTEX_H_ 1
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A simple mutex class
+ *
+ * @version $Id:$
+ */
+class Mutex {
+ public:
+ typedef void (*Initializer)(void*);
+
+ Mutex(Initializer init = DEFAULT_INITIALIZER);
+ virtual ~Mutex() {}
+ virtual void lock() const;
+ virtual bool trylock() const;
+ virtual void unlock() const;
+
+ static void DEFAULT_INITIALIZER(void*);
+ static void ADAPTIVE_INITIALIZER(void*);
+ static void RECURSIVE_INITIALIZER(void*);
+
+ private:
+
+ class impl;
+ boost::shared_ptr<impl> impl_;
+};
+
+class ReadWriteMutex {
+public:
+ ReadWriteMutex();
+ virtual ~ReadWriteMutex() {}
+
+ // these get the lock and block until it is done successfully
+ virtual void acquireRead() const;
+ virtual void acquireWrite() const;
+
+ // these attempt to get the lock, returning false immediately if they fail
+ virtual bool attemptRead() const;
+ virtual bool attemptWrite() const;
+
+ // this releases both read and write locks
+ virtual void release() const;
+
+private:
+
+ class impl;
+ boost::shared_ptr<impl> impl_;
+};
+
+class Guard {
+ public:
+ Guard(const Mutex& value) : mutex_(value) {
+ mutex_.lock();
+ }
+ ~Guard() {
+ mutex_.unlock();
+ }
+
+ private:
+ const Mutex& mutex_;
+};
+
+class RWGuard {
+ public:
+ RWGuard(const ReadWriteMutex& value, bool write = 0) : rw_mutex_(value) {
+ if (write) {
+ rw_mutex_.acquireWrite();
+ } else {
+ rw_mutex_.acquireRead();
+ }
+ }
+ ~RWGuard() {
+ rw_mutex_.release();
+ }
+ private:
+ const ReadWriteMutex& rw_mutex_;
+};
+
+
+// A little hack to prevent someone from trying to do "Guard(m);"
+// Sorry for polluting the global namespace, but I think it's worth it.
+#define Guard(m) incorrect_use_of_Guard(m)
+#define RWGuard(m) incorrect_use_of_RWGuard(m)
+
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
new file mode 100644
index 000000000..e48dce39e
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "PosixThreadFactory.h"
+#include "Exception.h"
+
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+# include <google/profiler.h>
+#endif
+
+#include <assert.h>
+#include <pthread.h>
+
+#include <iostream>
+
+#include <boost/weak_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::weak_ptr;
+
+/**
+ * The POSIX thread class.
+ *
+ * @version $Id:$
+ */
+class PthreadThread: public Thread {
+ public:
+
+ enum STATE {
+ uninitialized,
+ starting,
+ started,
+ stopping,
+ stopped
+ };
+
+ static const int MB = 1024 * 1024;
+
+ static void* threadMain(void* arg);
+
+ private:
+ pthread_t pthread_;
+ STATE state_;
+ int policy_;
+ int priority_;
+ int stackSize_;
+ weak_ptr<PthreadThread> self_;
+ bool detached_;
+
+ public:
+
+ PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
+ pthread_(0),
+ state_(uninitialized),
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize),
+ detached_(detached) {
+
+ this->Thread::runnable(runnable);
+ }
+
+ ~PthreadThread() {
+ /* Nothing references this thread, if is is not detached, do a join
+ now, otherwise the thread-id and, possibly, other resources will
+ be leaked. */
+ if(!detached_) {
+ try {
+ join();
+ } catch(...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ void start() {
+ if (state_ != uninitialized) {
+ return;
+ }
+
+ pthread_attr_t thread_attr;
+ if (pthread_attr_init(&thread_attr) != 0) {
+ throw SystemResourceException("pthread_attr_init failed");
+ }
+
+ if(pthread_attr_setdetachstate(&thread_attr,
+ detached_ ?
+ PTHREAD_CREATE_DETACHED :
+ PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
+ }
+
+ // Set thread stack size
+ if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
+ throw SystemResourceException("pthread_attr_setstacksize failed");
+ }
+
+ // Set thread policy
+ if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
+ throw SystemResourceException("pthread_attr_setschedpolicy failed");
+ }
+
+ struct sched_param sched_param;
+ sched_param.sched_priority = priority_;
+
+ // Set thread priority
+ if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
+ throw SystemResourceException("pthread_attr_setschedparam failed");
+ }
+
+ // Create reference
+ shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
+ *selfRef = self_.lock();
+
+ state_ = starting;
+
+ if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
+ throw SystemResourceException("pthread_create failed");
+ }
+ }
+
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ void* ignore;
+ /* XXX
+ If join fails it is most likely due to the fact
+ that the last reference was the thread itself and cannot
+ join. This results in leaked threads and will eventually
+ cause the process to run out of thread resources.
+ We're beyond the point of throwing an exception. Not clear how
+ best to handle this. */
+ detached_ = pthread_join(pthread_, &ignore) == 0;
+ }
+ }
+
+ Thread::id_t getId() {
+ return (Thread::id_t)pthread_;
+ }
+
+ shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+ void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
+
+ void weakRef(shared_ptr<PthreadThread> self) {
+ assert(self.get() == this);
+ self_ = weak_ptr<PthreadThread>(self);
+ }
+};
+
+void* PthreadThread::threadMain(void* arg) {
+ shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
+ delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
+
+ if (thread == NULL) {
+ return (void*)0;
+ }
+
+ if (thread->state_ != starting) {
+ return (void*)0;
+ }
+
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+ ProfilerRegisterThread();
+#endif
+
+ thread->state_ = starting;
+ thread->runnable()->run();
+ if (thread->state_ != stopping && thread->state_ != stopped) {
+ thread->state_ = stopping;
+ }
+
+ return (void*)0;
+}
+
+/**
+ * POSIX Thread factory implementation
+ */
+class PosixThreadFactory::Impl {
+
+ private:
+ POLICY policy_;
+ PRIORITY priority_;
+ int stackSize_;
+ bool detached_;
+
+ /**
+ * Converts generic posix thread schedule policy enums into pthread
+ * API values.
+ */
+ static int toPthreadPolicy(POLICY policy) {
+ switch (policy) {
+ case OTHER:
+ return SCHED_OTHER;
+ case FIFO:
+ return SCHED_FIFO;
+ case ROUND_ROBIN:
+ return SCHED_RR;
+ }
+ return SCHED_OTHER;
+ }
+
+ /**
+ * Converts relative thread priorities to absolute value based on posix
+ * thread scheduler policy
+ *
+ * The idea is simply to divide up the priority range for the given policy
+ * into the correpsonding relative priority level (lowest..highest) and
+ * then pro-rate accordingly.
+ */
+ static int toPthreadPriority(POLICY policy, PRIORITY priority) {
+ int pthread_policy = toPthreadPolicy(policy);
+ int min_priority = sched_get_priority_min(pthread_policy);
+ int max_priority = sched_get_priority_max(pthread_policy);
+ int quanta = (HIGHEST - LOWEST) + 1;
+ float stepsperquanta = (max_priority - min_priority) / quanta;
+
+ if (priority <= HIGHEST) {
+ return (int)(min_priority + stepsperquanta * priority);
+ } else {
+ // should never get here for priority increments.
+ assert(false);
+ return (int)(min_priority + stepsperquanta * NORMAL);
+ }
+ }
+
+ public:
+
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize),
+ detached_(detached) {}
+
+ /**
+ * Creates a new POSIX thread to run the runnable object
+ *
+ * @param runnable A runnable object
+ */
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
+ }
+
+ int getStackSize() const { return stackSize_; }
+
+ void setStackSize(int value) { stackSize_ = value; }
+
+ PRIORITY getPriority() const { return priority_; }
+
+ /**
+ * Sets priority.
+ *
+ * XXX
+ * Need to handle incremental priorities properly.
+ */
+ void setPriority(PRIORITY value) { priority_ = value; }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const {
+ // TODO(dreiss): Stop using C-style casts.
+ return (id_t)pthread_self();
+ }
+
+};
+
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+
+int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
+
+void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
+
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
+
+bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
new file mode 100644
index 000000000..d6d83a3a1
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
+
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create posix threads
+ *
+ * @version $Id:$
+ */
+class PosixThreadFactory : public ThreadFactory {
+
+ public:
+
+ /**
+ * POSIX Thread scheduler policies
+ */
+ enum POLICY {
+ OTHER,
+ FIFO,
+ ROUND_ROBIN
+ };
+
+ /**
+ * POSIX Thread scheduler relative priorities,
+ *
+ * Absolute priority is determined by scheduler policy and OS. This
+ * enumeration specifies relative priorities such that one can specify a
+ * priority withing a giving scheduler policy without knowing the absolute
+ * value of the priority.
+ */
+ enum PRIORITY {
+ LOWEST = 0,
+ LOWER = 1,
+ LOW = 2,
+ NORMAL = 3,
+ HIGH = 4,
+ HIGHER = 5,
+ HIGHEST = 6,
+ INCREMENT = 7,
+ DECREMENT = 8
+ };
+
+ /**
+ * Posix thread (pthread) factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * Threads are created with the specified policy, priority, stack-size and detachable-mode
+ * detached means the thread is free-running and will release all system resources the
+ * when it completes. A detachable thread is not joinable. The join method
+ * of a detachable thread will return immediately with no error.
+ *
+ * By default threads are not joinable.
+ */
+
+ PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=true);
+
+ // From ThreadFactory;
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+ // From ThreadFactory;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Gets stack size for created threads
+ *
+ * @return int size in megabytes
+ */
+ virtual int getStackSize() const;
+
+ /**
+ * Sets stack size for created threads
+ *
+ * @param value size in megabytes
+ */
+ virtual void setStackSize(int value);
+
+ /**
+ * Gets priority relative to current policy
+ */
+ virtual PRIORITY getPriority() const;
+
+ /**
+ * Sets priority relative to current policy
+ */
+ virtual void setPriority(PRIORITY priority);
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
+ private:
+ class Impl;
+ boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
new file mode 100644
index 000000000..d4282adbc
--- /dev/null
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREAD_H_
+#define _THRIFT_CONCURRENCY_THREAD_H_ 1
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+class Thread;
+
+/**
+ * Minimal runnable class. More or less analogous to java.lang.Runnable.
+ *
+ * @version $Id:$
+ */
+class Runnable {
+
+ public:
+ virtual ~Runnable() {};
+ virtual void run() = 0;
+
+ /**
+ * Gets the thread object that is hosting this runnable object - can return
+ * an empty boost::shared pointer if no references remain on thet thread object
+ */
+ virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
+
+ /**
+ * Sets the thread that is executing this object. This is only meant for
+ * use by concrete implementations of Thread.
+ */
+ virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
+
+ private:
+ boost::weak_ptr<Thread> thread_;
+};
+
+/**
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * and ready to start execution. More or less analogous to java.lang.Thread
+ * (minus all the thread group, priority, mode and other baggage, since that
+ * is difficult to abstract across platforms and is left for platform-specific
+ * ThreadFactory implemtations to deal with
+ *
+ * @see apache::thrift::concurrency::ThreadFactory)
+ */
+class Thread {
+
+ public:
+
+ typedef uint64_t id_t;
+
+ virtual ~Thread() {};
+
+ /**
+ * Starts the thread. Does platform specific thread creation and
+ * configuration then invokes the run method of the Runnable object bound
+ * to this thread.
+ */
+ virtual void start() = 0;
+
+ /**
+ * Join this thread. Current thread blocks until this target thread
+ * completes.
+ */
+ virtual void join() = 0;
+
+ /**
+ * Gets the thread's platform-specific ID
+ */
+ virtual id_t getId() = 0;
+
+ /**
+ * Gets the runnable object this thread is hosting
+ */
+ virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
+
+ protected:
+ virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
+
+ private:
+ boost::shared_ptr<Runnable> _runnable;
+
+};
+
+/**
+ * Factory to create platform-specific thread object and bind them to Runnable
+ * object for execution
+ */
+class ThreadFactory {
+
+ public:
+ virtual ~ThreadFactory() {}
+ virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+
+ /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */
+
+ static const Thread::id_t unknown_thread_id;
+
+ virtual Thread::id_t getCurrentThreadId() const = 0;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
new file mode 100644
index 000000000..abfcf6e70
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ThreadManager.h"
+#include "Exception.h"
+#include "Monitor.h"
+
+#include <boost/shared_ptr.hpp>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times.
+ *
+ * @version $Id:$
+ */
+class ThreadManager::Impl : public ThreadManager {
+
+ public:
+ Impl() :
+ workerCount_(0),
+ workerMaxCount_(0),
+ idleCount_(0),
+ pendingTaskCountMax_(0),
+ state_(ThreadManager::UNINITIALIZED) {}
+
+ ~Impl() { stop(); }
+
+ void start();
+
+ void stop() { stopImpl(false); }
+
+ void join() { stopImpl(true); }
+
+ const ThreadManager::STATE state() const {
+ return state_;
+ }
+
+ shared_ptr<ThreadFactory> threadFactory() const {
+ Synchronized s(monitor_);
+ return threadFactory_;
+ }
+
+ void threadFactory(shared_ptr<ThreadFactory> value) {
+ Synchronized s(monitor_);
+ threadFactory_ = value;
+ }
+
+ void addWorker(size_t value);
+
+ void removeWorker(size_t value);
+
+ size_t idleWorkerCount() const {
+ return idleCount_;
+ }
+
+ size_t workerCount() const {
+ Synchronized s(monitor_);
+ return workerCount_;
+ }
+
+ size_t pendingTaskCount() const {
+ Synchronized s(monitor_);
+ return tasks_.size();
+ }
+
+ size_t totalTaskCount() const {
+ Synchronized s(monitor_);
+ return tasks_.size() + workerCount_ - idleCount_;
+ }
+
+ size_t pendingTaskCountMax() const {
+ Synchronized s(monitor_);
+ return pendingTaskCountMax_;
+ }
+
+ void pendingTaskCountMax(const size_t value) {
+ Synchronized s(monitor_);
+ pendingTaskCountMax_ = value;
+ }
+
+ bool canSleep();
+
+ void add(shared_ptr<Runnable> value, int64_t timeout);
+
+ void remove(shared_ptr<Runnable> task);
+
+private:
+ void stopImpl(bool join);
+
+ size_t workerCount_;
+ size_t workerMaxCount_;
+ size_t idleCount_;
+ size_t pendingTaskCountMax_;
+
+ ThreadManager::STATE state_;
+ shared_ptr<ThreadFactory> threadFactory_;
+
+
+ friend class ThreadManager::Task;
+ std::queue<shared_ptr<Task> > tasks_;
+ Monitor monitor_;
+ Monitor workerMonitor_;
+
+ friend class ThreadManager::Worker;
+ std::set<shared_ptr<Thread> > workers_;
+ std::set<shared_ptr<Thread> > deadWorkers_;
+ std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
+};
+
+class ThreadManager::Task : public Runnable {
+
+ public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(shared_ptr<Runnable> runnable) :
+ runnable_(runnable),
+ state_(WAITING) {}
+
+ ~Task() {}
+
+ void run() {
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
+ }
+ }
+
+ private:
+ shared_ptr<Runnable> runnable_;
+ friend class ThreadManager::Worker;
+ STATE state_;
+};
+
+class ThreadManager::Worker: public Runnable {
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ public:
+ Worker(ThreadManager::Impl* manager) :
+ manager_(manager),
+ state_(UNINITIALIZED),
+ idle_(false) {}
+
+ ~Worker() {}
+
+ private:
+ bool isActive() const {
+ return
+ (manager_->workerCount_ <= manager_->workerMaxCount_) ||
+ (manager_->state_ == JOINING && !manager_->tasks_.empty());
+ }
+
+ public:
+ /**
+ * Worker entry point
+ *
+ * As long as worker thread is running, pull tasks off the task queue and
+ * execute.
+ */
+ void run() {
+ bool active = false;
+ bool notifyManager = false;
+
+ /**
+ * Increment worker semaphore and notify manager if worker count reached
+ * desired max
+ *
+ * Note: We have to release the monitor and acquire the workerMonitor
+ * since that is what the manager blocks on for worker add/remove
+ */
+ {
+ Synchronized s(manager_->monitor_);
+ active = manager_->workerCount_ < manager_->workerMaxCount_;
+ if (active) {
+ manager_->workerCount_++;
+ notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+ }
+ }
+
+ if (notifyManager) {
+ Synchronized s(manager_->workerMonitor_);
+ manager_->workerMonitor_.notify();
+ notifyManager = false;
+ }
+
+ while (active) {
+ shared_ptr<ThreadManager::Task> task;
+
+ /**
+ * While holding manager monitor block for non-empty task queue (Also
+ * check that the thread hasn't been requested to stop). Once the queue
+ * is non-empty, dequeue a task, release monitor, and execute. If the
+ * worker max count has been decremented such that we exceed it, mark
+ * ourself inactive, decrement the worker count and notify the manager
+ * (technically we're notifying the next blocked thread but eventually
+ * the manager will see it.
+ */
+ {
+ Synchronized s(manager_->monitor_);
+ active = isActive();
+
+ while (active && manager_->tasks_.empty()) {
+ manager_->idleCount_++;
+ idle_ = true;
+ manager_->monitor_.wait();
+ active = isActive();
+ idle_ = false;
+ manager_->idleCount_--;
+ }
+
+ if (active) {
+ if (!manager_->tasks_.empty()) {
+ task = manager_->tasks_.front();
+ manager_->tasks_.pop();
+ if (task->state_ == ThreadManager::Task::WAITING) {
+ task->state_ = ThreadManager::Task::EXECUTING;
+ }
+
+ /* If we have a pending task max and we just dropped below it, wakeup any
+ thread that might be blocked on add. */
+ if (manager_->pendingTaskCountMax_ != 0 &&
+ manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
+ manager_->monitor_.notify();
+ }
+ }
+ } else {
+ idle_ = true;
+ manager_->workerCount_--;
+ notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
+ }
+ }
+
+ if (task != NULL) {
+ if (task->state_ == ThreadManager::Task::EXECUTING) {
+ try {
+ task->run();
+ } catch(...) {
+ // XXX need to log this
+ }
+ }
+ }
+ }
+
+ {
+ Synchronized s(manager_->workerMonitor_);
+ manager_->deadWorkers_.insert(this->thread());
+ if (notifyManager) {
+ manager_->workerMonitor_.notify();
+ }
+ }
+
+ return;
+ }
+
+ private:
+ ThreadManager::Impl* manager_;
+ friend class ThreadManager::Impl;
+ STATE state_;
+ bool idle_;
+};
+
+
+ void ThreadManager::Impl::addWorker(size_t value) {
+ std::set<shared_ptr<Thread> > newThreads;
+ for (size_t ix = 0; ix < value; ix++) {
+ class ThreadManager::Worker;
+ shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
+ newThreads.insert(threadFactory_->newThread(worker));
+ }
+
+ {
+ Synchronized s(monitor_);
+ workerMaxCount_ += value;
+ workers_.insert(newThreads.begin(), newThreads.end());
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+ shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
+ worker->state_ = ThreadManager::Worker::STARTING;
+ (*ix)->start();
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
+ }
+
+ {
+ Synchronized s(workerMonitor_);
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::start() {
+
+ if (state_ == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {
+ Synchronized s(monitor_);
+ if (state_ == ThreadManager::UNINITIALIZED) {
+ if (threadFactory_ == NULL) {
+ throw InvalidArgumentException();
+ }
+ state_ = ThreadManager::STARTED;
+ monitor_.notifyAll();
+ }
+
+ while (state_ == STARTING) {
+ monitor_.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::stopImpl(bool join) {
+ bool doStop = false;
+ if (state_ == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STOPPING &&
+ state_ != ThreadManager::JOINING &&
+ state_ != ThreadManager::STOPPED) {
+ doStop = true;
+ state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
+ }
+ }
+
+ if (doStop) {
+ removeWorker(workerCount_);
+ }
+
+ // XXX
+ // should be able to block here for transition to STOPPED since we're no
+ // using shared_ptrs
+
+ {
+ Synchronized s(monitor_);
+ state_ = ThreadManager::STOPPED;
+ }
+
+}
+
+void ThreadManager::Impl::removeWorker(size_t value) {
+ std::set<shared_ptr<Thread> > removedThreads;
+ {
+ Synchronized s(monitor_);
+ if (value > workerMaxCount_) {
+ throw InvalidArgumentException();
+ }
+
+ workerMaxCount_ -= value;
+
+ if (idleCount_ < value) {
+ for (size_t ix = 0; ix < idleCount_; ix++) {
+ monitor_.notify();
+ }
+ } else {
+ monitor_.notifyAll();
+ }
+ }
+
+ {
+ Synchronized s(workerMonitor_);
+
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
+ workers_.erase(*ix);
+ idMap_.erase((*ix)->getId());
+ }
+
+ deadWorkers_.clear();
+ }
+}
+
+ bool ThreadManager::Impl::canSleep() {
+ const Thread::id_t id = threadFactory_->getCurrentThreadId();
+ return idMap_.find(id) == idMap_.end();
+ }
+
+ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
+ Synchronized s(monitor_);
+
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException();
+ }
+
+ if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+ if (canSleep() && timeout >= 0) {
+ while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+ monitor_.wait(timeout);
+ }
+ } else {
+ throw TooManyPendingTasksException();
+ }
+ }
+
+ tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
+
+ // If idle thread is available notify it, otherwise all worker threads are
+ // running and will get around to this task in time.
+ if (idleCount_ > 0) {
+ monitor_.notify();
+ }
+ }
+
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException();
+ }
+}
+
+class SimpleThreadManager : public ThreadManager::Impl {
+
+ public:
+ SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
+ workerCount_(workerCount),
+ pendingTaskCountMax_(pendingTaskCountMax),
+ firstTime_(true) {
+ }
+
+ void start() {
+ ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
+ ThreadManager::Impl::start();
+ addWorker(workerCount_);
+ }
+
+ private:
+ const size_t workerCount_;
+ const size_t pendingTaskCountMax_;
+ bool firstTime_;
+ Monitor monitor_;
+};
+
+
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+ return shared_ptr<ThreadManager>(new ThreadManager::Impl());
+}
+
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
+}
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
new file mode 100644
index 000000000..6e5a17817
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
+#define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <sys/types.h>
+#include "Thread.h"
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Thread Pool Manager and related classes
+ *
+ * @version $Id:$
+ */
+class ThreadManager;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * It maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times and informs the PoolPolicy
+ * object bound to instances of this manager of interesting transitions. It is
+ * then up the PoolPolicy object to decide if the thread pool size needs to be
+ * adjusted and call this object addWorker and removeWorker methods to make
+ * changes.
+ *
+ * This design allows different policy implementations to used this code to
+ * handle basic worker thread management and worker task execution and focus on
+ * policy issues. The simplest policy, StaticPolicy, does nothing other than
+ * create a fixed number of threads.
+ */
+class ThreadManager {
+
+ protected:
+ ThreadManager() {}
+
+ public:
+ virtual ~ThreadManager() {}
+
+ /**
+ * Starts the thread manager. Verifies all attributes have been properly
+ * initialized, then allocates necessary resources to begin operation
+ */
+ virtual void start() = 0;
+
+ /**
+ * Stops the thread manager. Aborts all remaining unprocessed task, shuts
+ * down all created worker threads, and realeases all allocated resources.
+ * This method blocks for all worker threads to complete, thus it can
+ * potentially block forever if a worker thread is running a task that
+ * won't terminate.
+ */
+ virtual void stop() = 0;
+
+ /**
+ * Joins the thread manager. This is the same as stop, except that it will
+ * block until all the workers have finished their work. At that point
+ * the ThreadManager will transition into the STOPPED state.
+ */
+ virtual void join() = 0;
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ JOINING,
+ STOPPING,
+ STOPPED
+ };
+
+ virtual const STATE state() const = 0;
+
+ virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
+
+ virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0;
+
+ virtual void addWorker(size_t value=1) = 0;
+
+ virtual void removeWorker(size_t value=1) = 0;
+
+ /**
+ * Gets the current number of idle worker threads
+ */
+ virtual size_t idleWorkerCount() const = 0;
+
+ /**
+ * Gets the current number of total worker threads
+ */
+ virtual size_t workerCount() const = 0;
+
+ /**
+ * Gets the current number of pending tasks
+ */
+ virtual size_t pendingTaskCount() const = 0;
+
+ /**
+ * Gets the current number of pending and executing tasks
+ */
+ virtual size_t totalTaskCount() const = 0;
+
+ /**
+ * Gets the maximum pending task count. 0 indicates no maximum
+ */
+ virtual size_t pendingTaskCountMax() const = 0;
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
+ * is greater than or equalt to pendingTaskCountMax(). If this method is called in the
+ * context of a ThreadManager worker thread it will throw a
+ * TooManyPendingTasksException
+ *
+ * @param task The task to queue for execution
+ *
+ * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
+ * is specified. Specific cases:
+ * timeout = 0 : Wait forever to queue task.
+ * timeout = -1 : Return immediately if pending task count exceeds specified max
+ *
+ * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
+ */
+ virtual void add(boost::shared_ptr<Runnable>task, int64_t timeout=0LL) = 0;
+
+ /**
+ * Removes a pending task
+ */
+ virtual void remove(boost::shared_ptr<Runnable> task) = 0;
+
+ static boost::shared_ptr<ThreadManager> newThreadManager();
+
+ /**
+ * Creates a simple thread manager the uses count number of worker threads and has
+ * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
+ * on pending tasks
+ */
+ static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4, size_t pendingTaskCountMax=0);
+
+ class Task;
+
+ class Worker;
+
+ class Impl;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp
new file mode 100644
index 000000000..25515dc82
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.cpp
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TimerManager.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+
+typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
+typedef std::pair<task_iterator, task_iterator> task_range;
+
+/**
+ * TimerManager class
+ *
+ * @version $Id:$
+ */
+class TimerManager::Task : public Runnable {
+
+ public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(shared_ptr<Runnable> runnable) :
+ runnable_(runnable),
+ state_(WAITING) {}
+
+ ~Task() {
+ }
+
+ void run() {
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
+ }
+ }
+
+ private:
+ shared_ptr<Runnable> runnable_;
+ class TimerManager::Dispatcher;
+ friend class TimerManager::Dispatcher;
+ STATE state_;
+};
+
+class TimerManager::Dispatcher: public Runnable {
+
+ public:
+ Dispatcher(TimerManager* manager) :
+ manager_(manager) {}
+
+ ~Dispatcher() {}
+
+ /**
+ * Dispatcher entry point
+ *
+ * As long as dispatcher thread is running, pull tasks off the task taskMap_
+ * and execute.
+ */
+ void run() {
+ {
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STARTING) {
+ manager_->state_ = TimerManager::STARTED;
+ manager_->monitor_.notifyAll();
+ }
+ }
+
+ do {
+ std::set<shared_ptr<TimerManager::Task> > expiredTasks;
+ {
+ Synchronized s(manager_->monitor_);
+ task_iterator expiredTaskEnd;
+ int64_t now = Util::currentTime();
+ while (manager_->state_ == TimerManager::STARTED &&
+ (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
+ int64_t timeout = 0LL;
+ if (!manager_->taskMap_.empty()) {
+ timeout = manager_->taskMap_.begin()->first - now;
+ }
+ assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
+ try {
+ manager_->monitor_.wait(timeout);
+ } catch (TimedOutException &e) {}
+ now = Util::currentTime();
+ }
+
+ if (manager_->state_ == TimerManager::STARTED) {
+ for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
+ shared_ptr<TimerManager::Task> task = ix->second;
+ expiredTasks.insert(task);
+ if (task->state_ == TimerManager::Task::WAITING) {
+ task->state_ = TimerManager::Task::EXECUTING;
+ }
+ manager_->taskCount_--;
+ }
+ manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
+ }
+ }
+
+ for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+ (*ix)->run();
+ }
+
+ } while (manager_->state_ == TimerManager::STARTED);
+
+ {
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STOPPING) {
+ manager_->state_ = TimerManager::STOPPED;
+ manager_->monitor_.notify();
+ }
+ }
+ return;
+ }
+
+ private:
+ TimerManager* manager_;
+ friend class TimerManager;
+};
+
+TimerManager::TimerManager() :
+ taskCount_(0),
+ state_(TimerManager::UNINITIALIZED),
+ dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
+}
+
+
+TimerManager::~TimerManager() {
+
+ // If we haven't been explicitly stopped, do so now. We don't need to grab
+ // the monitor here, since stop already takes care of reentrancy.
+
+ if (state_ != STOPPED) {
+ try {
+ stop();
+ } catch(...) {
+ throw;
+ // uhoh
+ }
+ }
+}
+
+void TimerManager::start() {
+ bool doStart = false;
+ {
+ Synchronized s(monitor_);
+ if (threadFactory_ == NULL) {
+ throw InvalidArgumentException();
+ }
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STARTING;
+ doStart = true;
+ }
+ }
+
+ if (doStart) {
+ dispatcherThread_ = threadFactory_->newThread(dispatcher_);
+ dispatcherThread_->start();
+ }
+
+ {
+ Synchronized s(monitor_);
+ while (state_ == TimerManager::STARTING) {
+ monitor_.wait();
+ }
+ assert(state_ != TimerManager::STARTING);
+ }
+}
+
+void TimerManager::stop() {
+ bool doStop = false;
+ {
+ Synchronized s(monitor_);
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STOPPED;
+ } else if (state_ != STOPPING && state_ != STOPPED) {
+ doStop = true;
+ state_ = STOPPING;
+ monitor_.notifyAll();
+ }
+ while (state_ != STOPPED) {
+ monitor_.wait();
+ }
+ }
+
+ if (doStop) {
+ // Clean up any outstanding tasks
+ for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
+ taskMap_.erase(ix);
+ }
+
+ // Remove dispatcher's reference to us.
+ dispatcher_->manager_ = NULL;
+ }
+}
+
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
+ Synchronized s(monitor_);
+ return threadFactory_;
+}
+
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
+ Synchronized s(monitor_);
+ threadFactory_ = value;
+}
+
+size_t TimerManager::taskCount() const {
+ return taskCount_;
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
+ int64_t now = Util::currentTime();
+ timeout += now;
+
+ {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+
+ taskCount_++;
+ taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+
+ // If the task map was empty, or if we have an expiration that is earlier
+ // than any previously seen, kick the dispatcher so it can update its
+ // timeout
+ if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
+ monitor_.notify();
+ }
+ }
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
+
+ int64_t expiration;
+ Util::toMilliseconds(expiration, value);
+
+ int64_t now = Util::currentTime();
+
+ if (expiration < now) {
+ throw InvalidArgumentException();
+ }
+
+ add(task, expiration - now);
+}
+
+
+void TimerManager::remove(shared_ptr<Runnable> task) {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+}
+
+const TimerManager::STATE TimerManager::state() const { return state_; }
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
new file mode 100644
index 000000000..f3f799f93
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
+#define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1
+
+#include "Exception.h"
+#include "Monitor.h"
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <time.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Timer Manager
+ *
+ * This class dispatches timer tasks when they fall due.
+ *
+ * @version $Id:$
+ */
+class TimerManager {
+
+ public:
+
+ TimerManager();
+
+ virtual ~TimerManager();
+
+ virtual boost::shared_ptr<const ThreadFactory> threadFactory() const;
+
+ virtual void threadFactory(boost::shared_ptr<const ThreadFactory> value);
+
+ /**
+ * Starts the timer manager service
+ *
+ * @throws IllegalArgumentException Missing thread factory attribute
+ */
+ virtual void start();
+
+ /**
+ * Stops the timer manager service
+ */
+ virtual void stop();
+
+ virtual size_t taskCount() const ;
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Time in milliseconds to delay before executing task
+ */
+ virtual void add(boost::shared_ptr<Runnable> task, int64_t timeout);
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Absolute time in the future to execute task.
+ */
+ virtual void add(boost::shared_ptr<Runnable> task, const struct timespec& timeout);
+
+ /**
+ * Removes a pending task
+ *
+ * @throws NoSuchTaskException Specified task doesn't exist. It was either
+ * processed already or this call was made for a
+ * task that was never added to this timer
+ *
+ * @throws UncancellableTaskException Specified task is already being
+ * executed or has completed execution.
+ */
+ virtual void remove(boost::shared_ptr<Runnable> task);
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ virtual const STATE state() const;
+
+ private:
+ boost::shared_ptr<const ThreadFactory> threadFactory_;
+ class Task;
+ friend class Task;
+ std::multimap<int64_t, boost::shared_ptr<Task> > taskMap_;
+ size_t taskCount_;
+ Monitor monitor_;
+ STATE state_;
+ class Dispatcher;
+ friend class Dispatcher;
+ boost::shared_ptr<Dispatcher> dispatcher_;
+ boost::shared_ptr<Thread> dispatcherThread_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
diff --git a/lib/cpp/src/concurrency/Util.cpp b/lib/cpp/src/concurrency/Util.cpp
new file mode 100644
index 000000000..1c4493716
--- /dev/null
+++ b/lib/cpp/src/concurrency/Util.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Util.h"
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#if defined(HAVE_CLOCK_GETTIME)
+#include <time.h>
+#elif defined(HAVE_GETTIMEOFDAY)
+#include <sys/time.h>
+#endif // defined(HAVE_CLOCK_GETTIME)
+
+namespace apache { namespace thrift { namespace concurrency {
+
+const int64_t Util::currentTime() {
+ int64_t result;
+
+#if defined(HAVE_CLOCK_GETTIME)
+ struct timespec now;
+ int ret = clock_gettime(CLOCK_REALTIME, &now);
+ assert(ret == 0);
+ toMilliseconds(result, now);
+#elif defined(HAVE_GETTIMEOFDAY)
+ struct timeval now;
+ int ret = gettimeofday(&now, NULL);
+ assert(ret == 0);
+ toMilliseconds(result, now);
+#else
+#error "No high-precision clock is available."
+#endif // defined(HAVE_CLOCK_GETTIME)
+
+ return result;
+}
+
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
new file mode 100644
index 000000000..25fcc2086
--- /dev/null
+++ b/lib/cpp/src/concurrency/Util.h
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_UTIL_H_
+#define _THRIFT_CONCURRENCY_UTIL_H_ 1
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <time.h>
+#include <sys/time.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Utility methods
+ *
+ * This class contains basic utility methods for converting time formats,
+ * and other common platform-dependent concurrency operations.
+ * It should not be included in API headers for other concurrency library
+ * headers, since it will, by definition, pull in all sorts of horrid
+ * platform dependent crap. Rather it should be inluded directly in
+ * concurrency library implementation source.
+ *
+ * @version $Id:$
+ */
+class Util {
+
+ static const int64_t NS_PER_S = 1000000000LL;
+ static const int64_t US_PER_S = 1000000LL;
+ static const int64_t MS_PER_S = 1000LL;
+
+ static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
+ static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
+
+ public:
+
+ /**
+ * Converts millisecond timestamp into a timespec struct
+ *
+ * @param struct timespec& result
+ * @param time or duration in milliseconds
+ */
+ static void toTimespec(struct timespec& result, int64_t value) {
+ result.tv_sec = value / MS_PER_S; // ms to s
+ result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
+ }
+
+ static void toTimeval(struct timeval& result, int64_t value) {
+ result.tv_sec = value / MS_PER_S; // ms to s
+ result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us
+ }
+
+ /**
+ * Converts struct timespec to milliseconds
+ */
+ static const void toMilliseconds(int64_t& result, const struct timespec& value) {
+ result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS);
+ // round up -- int64_t cast is to avoid a compiler error for some GCCs
+ if (int64_t(value.tv_nsec) % NS_PER_MS >= (NS_PER_MS / 2)) {
+ ++result;
+ }
+ }
+
+ /**
+ * Converts struct timeval to milliseconds
+ */
+ static const void toMilliseconds(int64_t& result, const struct timeval& value) {
+ result = (value.tv_sec * MS_PER_S) + (value.tv_usec / US_PER_MS);
+ // round up -- int64_t cast is to avoid a compiler error for some GCCs
+ if (int64_t(value.tv_usec) % US_PER_MS >= (US_PER_MS / 2)) {
+ ++result;
+ }
+ }
+
+ /**
+ * Get current time as milliseconds from epoch
+ */
+ static const int64_t currentTime();
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp
new file mode 100644
index 000000000..c80bb883f
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/Tests.cpp
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+#include <vector>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+#include "ThreadManagerTests.h"
+
+int main(int argc, char** argv) {
+
+ std::string arg;
+
+ std::vector<std::string> args(argc - 1 > 1 ? argc - 1 : 1);
+
+ args[0] = "all";
+
+ for (int ix = 1; ix < argc; ix++) {
+ args[ix - 1] = std::string(argv[ix]);
+ }
+
+ bool runAll = args[0].compare("all") == 0;
+
+ if (runAll || args[0].compare("thread-factory") == 0) {
+
+ ThreadFactoryTests threadFactoryTests;
+
+ std::cout << "ThreadFactory tests..." << std::endl;
+
+ size_t count = 1000;
+ size_t floodLoops = 1;
+ size_t floodCount = 100000;
+
+ std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
+
+ assert(threadFactoryTests.reapNThreads(count));
+
+ std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+ assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
+ std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
+
+ assert(threadFactoryTests.synchStartTest());
+
+ std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
+
+ assert(threadFactoryTests.monitorTimeoutTest());
+ }
+
+ if (runAll || args[0].compare("util") == 0) {
+
+ std::cout << "Util tests..." << std::endl;
+
+ std::cout << "\t\tUtil minimum time" << std::endl;
+
+ int64_t time00 = Util::currentTime();
+ int64_t time01 = Util::currentTime();
+
+ std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
+
+ time00 = Util::currentTime();
+ time01 = time00;
+ size_t count = 0;
+
+ while (time01 < time00 + 10) {
+ count++;
+ time01 = Util::currentTime();
+ }
+
+ std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
+ }
+
+
+ if (runAll || args[0].compare("timer-manager") == 0) {
+
+ std::cout << "TimerManager tests..." << std::endl;
+
+ std::cout << "\t\tTimerManager test00" << std::endl;
+
+ TimerManagerTests timerManagerTests;
+
+ assert(timerManagerTests.test00());
+ }
+
+ if (runAll || args[0].compare("thread-manager") == 0) {
+
+ std::cout << "ThreadManager tests..." << std::endl;
+
+ {
+
+ size_t workerCount = 100;
+
+ size_t taskCount = 100000;
+
+ int64_t delay = 10LL;
+
+ std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+ ThreadManagerTests threadManagerTests;
+
+ assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+
+ std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl;
+
+ assert(threadManagerTests.blockTest(delay, workerCount));
+
+ }
+ }
+
+ if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
+
+ std::cout << "ThreadManager benchmark tests..." << std::endl;
+
+ {
+
+ size_t minWorkerCount = 2;
+
+ size_t maxWorkerCount = 512;
+
+ size_t tasksPerWorker = 1000;
+
+ int64_t delay = 10LL;
+
+ for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
+
+ size_t taskCount = workerCount * tasksPerWorker;
+
+ std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+ ThreadManagerTests threadManagerTests;
+
+ threadManagerTests.loadTest(taskCount, delay, workerCount);
+ }
+ }
+ }
+}
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
new file mode 100644
index 000000000..859fbaf51
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using boost::shared_ptr;
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadFactoryTests {
+
+public:
+
+ static const double ERROR;
+
+ class Task: public Runnable {
+
+ public:
+
+ Task() {}
+
+ void run() {
+ std::cout << "\t\t\tHello World" << std::endl;
+ }
+ };
+
+ /**
+ * Hello world test
+ */
+ bool helloWorldTest() {
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+
+ shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
+
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+ thread->start();
+
+ thread->join();
+
+ std::cout << "\t\t\tSuccess!" << std::endl;
+
+ return true;
+ }
+
+ /**
+ * Reap N threads
+ */
+ class ReapNTask: public Runnable {
+
+ public:
+
+ ReapNTask(Monitor& monitor, int& activeCount) :
+ _monitor(monitor),
+ _count(activeCount) {}
+
+ void run() {
+ Synchronized s(_monitor);
+
+ _count--;
+
+ //std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+ if (_count == 0) {
+ _monitor.notify();
+ }
+ }
+
+ Monitor& _monitor;
+
+ int& _count;
+ };
+
+ bool reapNThreads(int loop=1, int count=10) {
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+
+ Monitor* monitor = new Monitor();
+
+ for(int lix = 0; lix < loop; lix++) {
+
+ int* activeCount = new int(count);
+
+ std::set<shared_ptr<Thread> > threads;
+
+ int tix;
+
+ for (tix = 0; tix < count; tix++) {
+ try {
+ threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+ } catch(SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+ throw e;
+ }
+ }
+
+ tix = 0;
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
+
+ try {
+ (*thread)->start();
+ } catch(SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl;
+ throw e;
+ }
+ }
+
+ {
+ Synchronized s(*monitor);
+ while (*activeCount > 0) {
+ monitor->wait(1000);
+ }
+ }
+
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ threads.erase(*thread);
+ }
+
+ std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
+ }
+
+ std::cout << "\t\t\tSuccess!" << std::endl;
+
+ return true;
+ }
+
+ class SynchStartTask: public Runnable {
+
+ public:
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ SynchStartTask(Monitor& monitor, volatile STATE& state) :
+ _monitor(monitor),
+ _state(state) {}
+
+ void run() {
+ {
+ Synchronized s(_monitor);
+ if (_state == SynchStartTask::STARTING) {
+ _state = SynchStartTask::STARTED;
+ _monitor.notify();
+ }
+ }
+
+ {
+ Synchronized s(_monitor);
+ while (_state == SynchStartTask::STARTED) {
+ _monitor.wait();
+ }
+
+ if (_state == SynchStartTask::STOPPING) {
+ _state = SynchStartTask::STOPPED;
+ _monitor.notifyAll();
+ }
+ }
+ }
+
+ private:
+ Monitor& _monitor;
+ volatile STATE& _state;
+ };
+
+ bool synchStartTest() {
+
+ Monitor monitor;
+
+ SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+
+ shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+ if (state == SynchStartTask::UNINITIALIZED) {
+
+ state = SynchStartTask::STARTING;
+
+ thread->start();
+ }
+
+ {
+ Synchronized s(monitor);
+ while (state == SynchStartTask::STARTING) {
+ monitor.wait();
+ }
+ }
+
+ assert(state != SynchStartTask::STARTING);
+
+ {
+ Synchronized s(monitor);
+
+ try {
+ monitor.wait(100);
+ } catch(TimedOutException& e) {
+ }
+
+ if (state == SynchStartTask::STARTED) {
+
+ state = SynchStartTask::STOPPING;
+
+ monitor.notify();
+ }
+
+ while (state == SynchStartTask::STOPPING) {
+ monitor.wait();
+ }
+ }
+
+ assert(state == SynchStartTask::STOPPED);
+
+ bool success = true;
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+
+ return true;
+ }
+
+ /** See how accurate monitor timeout is. */
+
+ bool monitorTimeoutTest(size_t count=1000, int64_t timeout=10) {
+
+ Monitor monitor;
+
+ int64_t startTime = Util::currentTime();
+
+ for (size_t ix = 0; ix < count; ix++) {
+ {
+ Synchronized s(monitor);
+ try {
+ monitor.wait(timeout);
+ } catch(TimedOutException& e) {
+ }
+ }
+ }
+
+ int64_t endTime = Util::currentTime();
+
+ double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
+
+ if (error < 0.0) {
+
+ error *= 1.0;
+ }
+
+ bool success = error < ThreadFactoryTests::ERROR;
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
+
+ return success;
+ }
+
+
+ class FloodTask : public Runnable {
+ public:
+
+ FloodTask(const size_t id) :_id(id) {}
+ ~FloodTask(){
+ if(_id % 1000 == 0) {
+ std::cout << "\t\tthread " << _id << " done" << std::endl;
+ }
+ }
+
+ void run(){
+ if(_id % 1000 == 0) {
+ std::cout << "\t\tthread " << _id << " started" << std::endl;
+ }
+
+ usleep(1);
+ }
+ const size_t _id;
+ };
+
+ void foo(PosixThreadFactory *tf) {
+ }
+
+ bool floodNTest(size_t loop=1, size_t count=100000) {
+
+ bool success = false;
+
+ for(size_t lix = 0; lix < loop; lix++) {
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+ threadFactory.setDetached(true);
+
+ for(size_t tix = 0; tix < count; tix++) {
+
+ try {
+
+ shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+ thread->start();
+
+ usleep(1);
+
+ } catch (TException& e) {
+
+ std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl;
+
+ return success;
+ }
+ }
+
+ std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+ success = true;
+ }
+
+ return success;
+ }
+};
+
+const double ThreadFactoryTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency::test
+
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
new file mode 100644
index 000000000..e7b517431
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <set>
+#include <iostream>
+#include <set>
+#include <stdint.h>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadManagerTests {
+
+public:
+
+ static const double ERROR;
+
+ class Task: public Runnable {
+
+ public:
+
+ Task(Monitor& monitor, size_t& count, int64_t timeout) :
+ _monitor(monitor),
+ _count(count),
+ _timeout(timeout),
+ _done(false) {}
+
+ void run() {
+
+ _startTime = Util::currentTime();
+
+ {
+ Synchronized s(_sleep);
+
+ try {
+ _sleep.wait(_timeout);
+ } catch(TimedOutException& e) {
+ ;
+ }catch(...) {
+ assert(0);
+ }
+ }
+
+ _endTime = Util::currentTime();
+
+ _done = true;
+
+ {
+ Synchronized s(_monitor);
+
+ // std::cout << "Thread " << _count << " completed " << std::endl;
+
+ _count--;
+
+ if (_count == 0) {
+
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+ size_t& _count;
+ int64_t _timeout;
+ int64_t _startTime;
+ int64_t _endTime;
+ bool _done;
+ Monitor _sleep;
+ };
+
+ /**
+ * Dispatch count tasks, each of which blocks for timeout milliseconds then
+ * completes. Verify that all tasks completed and that thread manager cleans
+ * up properly on delete.
+ */
+ bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
+
+ Monitor monitor;
+
+ size_t activeCount = count;
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
+
+ for (size_t ix = 0; ix < count; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
+ }
+
+ int64_t time00 = Util::currentTime();
+
+ for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+ threadManager->add(*ix);
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCount > 0) {
+
+ monitor.wait();
+ }
+ }
+
+ int64_t time01 = Util::currentTime();
+
+ int64_t firstTime = 9223372036854775807LL;
+ int64_t lastTime = 0;
+
+ double averageTime = 0;
+ int64_t minTime = 9223372036854775807LL;
+ int64_t maxTime = 0;
+
+ for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+ shared_ptr<ThreadManagerTests::Task> task = *ix;
+
+ int64_t delta = task->_endTime - task->_startTime;
+
+ assert(delta > 0);
+
+ if (task->_startTime < firstTime) {
+ firstTime = task->_startTime;
+ }
+
+ if (task->_endTime > lastTime) {
+ lastTime = task->_endTime;
+ }
+
+ if (delta < minTime) {
+ minTime = delta;
+ }
+
+ if (delta > maxTime) {
+ maxTime = delta;
+ }
+
+ averageTime+= delta;
+ }
+
+ averageTime /= count;
+
+ std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
+
+ double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+ double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+ if (error < 0) {
+ error*= -1.0;
+ }
+
+ bool success = error < ERROR;
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
+
+ return success;
+ }
+
+ class BlockTask: public Runnable {
+
+ public:
+
+ BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+ _monitor(monitor),
+ _bmonitor(bmonitor),
+ _count(count) {}
+
+ void run() {
+ {
+ Synchronized s(_bmonitor);
+
+ _bmonitor.wait();
+
+ }
+
+ {
+ Synchronized s(_monitor);
+
+ _count--;
+
+ if (_count == 0) {
+
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+ Monitor& _bmonitor;
+ size_t& _count;
+ };
+
+ /**
+ * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
+ * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
+
+ bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
+
+ bool success = false;
+
+ try {
+
+ Monitor bmonitor;
+ Monitor monitor;
+
+ size_t pendingTaskMaxCount = workerCount;
+
+ size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+ for (size_t ix = 0; ix < workerCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+ }
+
+ for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+ }
+
+ for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ threadManager->add(*ix);
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+ try {
+ threadManager->add(extraTask, 1);
+ throw TException("Unexpected success adding task in excess of pending task count");
+ } catch(TimedOutException& e) {
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[0] != 0) {
+ monitor.wait();
+ }
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ try {
+ threadManager->add(extraTask, 1);
+ } catch(TimedOutException& e) {
+ std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
+ throw TException("Unexpected timeout adding task");
+
+ } catch(TooManyPendingTasksException& e) {
+ std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+ throw TException("Unexpected timeout adding task");
+ }
+
+ // Wake up tasks that were pending before and wait for them to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[1] != 0) {
+ monitor.wait();
+ }
+ }
+
+ // Wake up the extra task and wait for it to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[2] != 0) {
+ monitor.wait();
+ }
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == 0))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ } catch(TException& e) {
+ }
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+ return success;
+ }
+};
+
+const double ThreadManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+
+using namespace apache::thrift::concurrency::test;
+
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
new file mode 100644
index 000000000..e6fe6ce7e
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <concurrency/TimerManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class TimerManagerTests {
+
+ public:
+
+ static const double ERROR;
+
+ class Task: public Runnable {
+ public:
+
+ Task(Monitor& monitor, int64_t timeout) :
+ _timeout(timeout),
+ _startTime(Util::currentTime()),
+ _monitor(monitor),
+ _success(false),
+ _done(false) {}
+
+ ~Task() { std::cerr << this << std::endl; }
+
+ void run() {
+
+ _endTime = Util::currentTime();
+
+ // Figure out error percentage
+
+ int64_t delta = _endTime - _startTime;
+
+
+ delta = delta > _timeout ? delta - _timeout : _timeout - delta;
+
+ float error = delta / _timeout;
+
+ if(error < ERROR) {
+ _success = true;
+ }
+
+ _done = true;
+
+ std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug
+
+ {Synchronized s(_monitor);
+ _monitor.notifyAll();
+ }
+ }
+
+ int64_t _timeout;
+ int64_t _startTime;
+ int64_t _endTime;
+ Monitor& _monitor;
+ bool _success;
+ bool _done;
+ };
+
+ /**
+ * This test creates two tasks and waits for the first to expire within 10%
+ * of the expected expiration time. It then verifies that the timer manager
+ * properly clean up itself and the remaining orphaned timeout task when the
+ * manager goes out of scope and its destructor is called.
+ */
+ bool test00(int64_t timeout=1000LL) {
+
+ shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
+
+ {
+
+ TimerManager timerManager;
+
+ timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
+
+ timerManager.start();
+
+ assert(timerManager.state() == TimerManager::STARTED);
+
+ shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
+
+ {
+ Synchronized s(_monitor);
+
+ timerManager.add(orphanTask, 10 * timeout);
+
+ timerManager.add(task, timeout);
+
+ _monitor.wait();
+ }
+
+ assert(task->_done);
+
+
+ std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+ }
+
+ // timerManager.stop(); This is where it happens via destructor
+
+ assert(!orphanTask->_done);
+
+ return true;
+ }
+
+ friend class TestTask;
+
+ Monitor _monitor;
+};
+
+const double TimerManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+