summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--configure.ac17
-rw-r--r--lib/cpp/Makefile.am19
-rw-r--r--lib/cpp/src/concurrency/BoostMonitor.cpp203
-rw-r--r--lib/cpp/src/concurrency/BoostMutex.cpp59
-rw-r--r--lib/cpp/src/concurrency/BoostThreadFactory.cpp182
-rw-r--r--lib/cpp/src/concurrency/BoostThreadFactory.h75
-rw-r--r--lib/cpp/src/concurrency/Mutex.h2
-rw-r--r--lib/cpp/src/concurrency/PlatformThreadFactory.h40
-rw-r--r--lib/cpp/src/concurrency/PosixThreadFactory.cpp2
-rw-r--r--lib/cpp/src/concurrency/Thread.h8
-rw-r--r--lib/cpp/src/concurrency/test/ThreadFactoryTests.h12
-rw-r--r--lib/cpp/src/concurrency/test/ThreadManagerTests.h12
-rw-r--r--lib/cpp/src/concurrency/test/TimerManagerTests.h4
-rw-r--r--lib/cpp/src/server/TThreadedServer.cpp8
-rw-r--r--lib/cpp/src/transport/TFDTransport.cpp2
-rw-r--r--lib/cpp/src/transport/TFileTransport.cpp109
-rw-r--r--lib/cpp/src/transport/TFileTransport.h23
-rw-r--r--lib/cpp/src/transport/TSocket.cpp6
-rw-r--r--lib/cpp/src/windows/config.h19
-rwxr-xr-xtest/cpp/Makefile.am6
-rwxr-xr-xtest/cpp/src/StressTest.cpp4
-rwxr-xr-xtest/cpp/src/StressTestNonBlocking.cpp4
-rwxr-xr-xtest/cpp/src/TestClient.cpp6
-rwxr-xr-xtest/cpp/src/TestServer.cpp6
-rw-r--r--test/threads/ThreadsClient.cpp6
-rw-r--r--test/threads/ThreadsServer.cpp10
27 files changed, 762 insertions, 83 deletions
diff --git a/.gitignore b/.gitignore
index 082f7b730..02d66d936 100644
--- a/.gitignore
+++ b/.gitignore
@@ -129,6 +129,7 @@
/lib/rb/ext/thrift_native.so
/lib/rb/spec/gen-*
/lib/rb/test/
+/lib/rb/thrift-*.gem
/lib/php/Makefile
/lib/php/Makefile.in
/lib/php/src/ext/thrift_protocol/.deps
diff --git a/configure.ac b/configure.ac
index 44fd168d4..9759a5a60 100644
--- a/configure.ac
+++ b/configure.ac
@@ -432,6 +432,23 @@ AC_SUBST(GCOV_CFLAGS)
AC_SUBST(GCOV_CXXFLAGS)
AC_SUBST(GCOV_LDFLAGS)
+AC_ARG_ENABLE(boostthreads,
+ [ --enable-boostthreads use boost threads, instead of POSIX pthread (experimental) ],
+ [case "${enableval}" in
+ yes) ENABLE_BOOSTTHREADS=1 ;;
+ no) ENABLE_BOOSTTHREADS=0 ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-cov) ;;
+ esac],
+ [ENABLE_BOOSTTHREADS=2])
+
+
+if test "x[$]ENABLE_BOOSTTHREADS" = "x1"; then
+ AC_MSG_WARN(enable boostthreads)
+ AC_DEFINE([USE_BOOST_THREAD], [1], [experimental --enable-boostthreads that replaces POSIX pthread by boost::thread])
+fi
+
+AM_CONDITIONAL([WITH_BOOSTTHREADS], [test "x[$]ENABLE_BOOSTTHREADS" = "x1"])
+
AC_CONFIG_HEADERS(config.h:config.hin)
AC_CONFIG_FILES([
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index d5bc48997..593ef9e03 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -41,14 +41,12 @@ endif
AM_CXXFLAGS = -Wall
AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(srcdir)/src
+AM_LDFLAGS = $(BOOST_LDFLAGS)
# Define the source files for the module
libthrift_la_SOURCES = src/Thrift.cpp \
src/TApplicationException.cpp \
- src/concurrency/Mutex.cpp \
- src/concurrency/Monitor.cpp \
- src/concurrency/PosixThreadFactory.cpp \
src/concurrency/ThreadManager.cpp \
src/concurrency/TimerManager.cpp \
src/concurrency/Util.cpp \
@@ -77,6 +75,17 @@ libthrift_la_SOURCES = src/Thrift.cpp \
src/async/TAsyncChannel.cpp \
src/processor/PeekProcessor.cpp
+if WITH_BOOSTTHREADS
+libthrift_la_SOURCES += src/concurrency/BoostThreadFactory.cpp \
+ src/concurrency/BoostMonitor.cpp \
+ src/concurrency/BoostMutex.cpp
+else
+libthrift_la_SOURCES += src/concurrency/Mutex.cpp \
+ src/concurrency/Monitor.cpp \
+ src/concurrency/PosixThreadFactory.cpp
+endif
+
+
libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \
src/async/TAsyncProtocolProcessor.cpp \
src/async/TEvhttpServer.cpp \
@@ -91,6 +100,10 @@ libthriftz_la_CPPFLAGS = $(AM_CPPFLAGS) $(ZLIB_CPPFLAGS)
libthriftnb_la_CXXFLAGS = $(AM_CXXFLAGS)
libthriftz_la_CXXFLAGS = $(AM_CXXFLAGS)
+if WITH_BOOSTTHREADS
+libthrift_la_LIBADD = -lboost_thread
+endif
+
include_thriftdir = $(includedir)/thrift
include_thrift_HEADERS = \
$(top_builddir)/config.h \
diff --git a/lib/cpp/src/concurrency/BoostMonitor.cpp b/lib/cpp/src/concurrency/BoostMonitor.cpp
new file mode 100644
index 000000000..7a9b589bd
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostMonitor.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "Monitor.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <errno.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+#include <boost/interprocess/sync/interprocess_condition.hpp>
+#include <boost/interprocess/sync/scoped_lock.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using namespace boost::interprocess;
+
+/**
+ * Monitor implementation using the boost interprocess library
+ *
+ * @version $Id:$
+ */
+class Monitor::Impl : public interprocess_condition {
+
+ public:
+
+ Impl()
+ : ownedMutex_(new Mutex()),
+ mutex_(NULL) {
+ init(ownedMutex_.get());
+ }
+
+ Impl(Mutex* mutex)
+ : mutex_(NULL) {
+ init(mutex);
+ }
+
+ Impl(Monitor* monitor)
+ : mutex_(NULL) {
+ init(&(monitor->mutex()));
+ }
+
+ Mutex& mutex() { return *mutex_; }
+ void lock() { mutex().lock(); }
+ void unlock() { mutex().unlock(); }
+
+ /**
+ * Exception-throwing version of waitForTimeRelative(), called simply
+ * wait(int64) for historical reasons. Timeout is in milliseconds.
+ *
+ * If the condition occurs, this function returns cleanly; on timeout or
+ * error an exception is thrown.
+ */
+ void wait(int64_t timeout_ms) {
+ int result = waitForTimeRelative(timeout_ms);
+ if (result == ETIMEDOUT) {
+ throw TimedOutException();
+ } else if (result != 0) {
+ throw TException(
+ "Monitor::wait() failed");
+ }
+ }
+
+ /**
+ * Waits until the specified timeout in milliseconds for the condition to
+ * occur, or waits forever if timeout_ms == 0.
+ *
+ * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTimeRelative(int64_t timeout_ms) {
+ if (timeout_ms == 0LL) {
+ return waitForever();
+ }
+
+ assert(mutex_);
+ interprocess_mutex* mutexImpl =
+ reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
+ int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT;
+ lock.release();
+ return res;
+ }
+
+ /**
+ * Waits until the absolute time specified using struct timespec.
+ * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const timespec* abstime) {
+ assert(mutex_);
+ interprocess_mutex* mutexImpl =
+ reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ struct timespec currenttime;
+ Util::toTimespec(currenttime, Util::currentTime());
+
+ long tv_sec = abstime->tv_sec - currenttime.tv_sec;
+ long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec;
+ if(tv_sec < 0)
+ tv_sec = 0;
+ if(tv_nsec < 0)
+ tv_nsec = 0;
+
+ scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
+ int res = timed_wait(lock, boost::get_system_time() +
+ boost::posix_time::seconds(tv_sec) +
+ boost::posix_time::microseconds(tv_nsec / 1000)
+ ) ? 0 : ETIMEDOUT;
+ lock.release();
+ return res;
+ }
+
+ /**
+ * Waits forever until the condition occurs.
+ * Returns 0 if condition occurs, or an error code otherwise.
+ */
+ int waitForever() {
+ assert(mutex_);
+ interprocess_mutex* mutexImpl =
+ reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
+ ((interprocess_condition*)this)->wait(lock);
+ lock.release();
+ return 0;
+ }
+
+
+ void notify() {
+ notify_one();
+ }
+
+ void notifyAll() {
+ notify_all();
+ }
+
+ private:
+
+ void init(Mutex* mutex) {
+ mutex_ = mutex;
+ }
+
+ boost::scoped_ptr<Mutex> ownedMutex_;
+ Mutex* mutex_;
+};
+
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
+
+Monitor::~Monitor() { delete impl_; }
+
+Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }
+
+void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }
+
+void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }
+
+void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }
+
+int Monitor::waitForTime(const timespec* abstime) const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
+}
+
+int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
+}
+
+int Monitor::waitForever() const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForever();
+}
+
+void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }
+
+void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/BoostMutex.cpp b/lib/cpp/src/concurrency/BoostMutex.cpp
new file mode 100644
index 000000000..2277f6158
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostMutex.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "Mutex.h"
+#include "Util.h"
+
+#include <cassert>
+#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+
+using namespace boost::interprocess;
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Implementation of Mutex class using boost interprocess mutex
+ *
+ * @version $Id:$
+ */
+class Mutex::impl : public interprocess_mutex {
+};
+
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {}
+
+void* Mutex::getUnderlyingImpl() const { return impl_.get(); }
+
+void Mutex::lock() const { impl_->lock(); }
+
+bool Mutex::trylock() const { return impl_->try_lock(); }
+
+bool Mutex::timedlock(int64_t ms) const { return impl_->timed_lock(boost::get_system_time()+boost::posix_time::milliseconds(ms)); }
+
+void Mutex::unlock() const { impl_->unlock(); }
+
+void Mutex::DEFAULT_INITIALIZER(void* arg) {
+}
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/concurrency/BoostThreadFactory.cpp
new file mode 100644
index 000000000..555152825
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostThreadFactory.cpp
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "BoostThreadFactory.h"
+#include "Exception.h"
+
+#include <cassert>
+
+#include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::weak_ptr;
+
+/**
+ * The boost thread class.
+ *
+ * @version $Id:$
+ */
+class BoostThread: public Thread {
+ public:
+
+ enum STATE {
+ uninitialized,
+ starting,
+ started,
+ stopping,
+ stopped
+ };
+
+ static void* threadMain(void* arg);
+
+ private:
+ std::auto_ptr<boost::thread> thread_;
+ STATE state_;
+ weak_ptr<BoostThread> self_;
+ bool detached_;
+
+ public:
+
+ BoostThread(bool detached, shared_ptr<Runnable> runnable) :
+ state_(uninitialized),
+ detached_(detached) {
+ this->Thread::runnable(runnable);
+ }
+
+ ~BoostThread() {
+ if(!detached_) {
+ try {
+ join();
+ } catch(...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ void start() {
+ if (state_ != uninitialized) {
+ return;
+ }
+
+ // Create reference
+ shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>();
+ *selfRef = self_.lock();
+
+ thread_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(threadMain, (void*)selfRef)));
+
+ if(detached_)
+ thread_->detach();
+
+ state_ = starting;
+ }
+
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ thread_->join();
+ }
+ }
+
+ Thread::id_t getId() {
+ return thread_.get() ? thread_->get_id() : boost::thread::id();
+ }
+
+ shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+ void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
+
+ void weakRef(shared_ptr<BoostThread> self) {
+ assert(self.get() == this);
+ self_ = weak_ptr<BoostThread>(self);
+ }
+};
+
+void* BoostThread::threadMain(void* arg) {
+ shared_ptr<BoostThread> thread = *(shared_ptr<BoostThread>*)arg;
+ delete reinterpret_cast<shared_ptr<BoostThread>*>(arg);
+
+ if (thread == NULL) {
+ return (void*)0;
+ }
+
+ if (thread->state_ != starting) {
+ return (void*)0;
+ }
+
+ thread->state_ = started;
+ thread->runnable()->run();
+
+ if (thread->state_ != stopping && thread->state_ != stopped) {
+ thread->state_ = stopping;
+ }
+ return (void*)0;
+}
+
+/**
+ * POSIX Thread factory implementation
+ */
+class BoostThreadFactory::Impl {
+
+ private:
+ bool detached_;
+
+ public:
+
+ Impl(bool detached) :
+ detached_(detached) {}
+
+ /**
+ * Creates a new POSIX thread to run the runnable object
+ *
+ * @param runnable A runnable object
+ */
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
+ shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
+ }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const {
+ return boost::this_thread::get_id();
+ }
+
+};
+
+BoostThreadFactory::BoostThreadFactory(bool detached) :
+ impl_(new BoostThreadFactory::Impl(detached)) {}
+
+shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+
+bool BoostThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void BoostThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t BoostThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/BoostThreadFactory.h b/lib/cpp/src/concurrency/BoostThreadFactory.h
new file mode 100644
index 000000000..a46670585
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostThreadFactory.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1
+
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create posix threads
+ *
+ * @version $Id:$
+ */
+class BoostThreadFactory : public ThreadFactory {
+
+ public:
+
+ /**
+ * Boost thread factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * Threads are created with the specified boost policy, priority, stack-size. A detachable thread is not
+ * joinable.
+ *
+ * By default threads are not joinable.
+ */
+
+ BoostThreadFactory(bool detached=true);
+
+ // From ThreadFactory;
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+ // From ThreadFactory;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
+private:
+ class Impl;
+ boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index 4b1c3bf1f..847aaf692 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -167,7 +167,7 @@ class RWGuard : boost::noncopyable {
// A little hack to prevent someone from trying to do "Guard(m);"
// Such a use is invalid because the temporary Guard object is
-// destoryed at the end of the line, releasing the lock.
+// destroyed at the end of the line, releasing the lock.
// Sorry for polluting the global namespace, but I think it's worth it.
#define Guard(m) incorrect_use_of_Guard(m)
#define RWGuard(m) incorrect_use_of_RWGuard(m)
diff --git a/lib/cpp/src/concurrency/PlatformThreadFactory.h b/lib/cpp/src/concurrency/PlatformThreadFactory.h
new file mode 100644
index 000000000..9f053a039
--- /dev/null
+++ b/lib/cpp/src/concurrency/PlatformThreadFactory.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
+
+#ifndef USE_BOOST_THREAD
+# include <concurrency/PosixThreadFactory.h>
+#else
+# include <concurrency/BoostThreadFactory.h>
+#endif
+
+namespace apache { namespace thrift { namespace concurrency {
+
+#ifndef USE_BOOST_THREAD
+ typedef PosixThreadFactory PlatformThreadFactory;
+#include <concurrency/PosixThreadFactory.h>
+#else
+ typedef BoostThreadFactory PlatformThreadFactory;
+#endif
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index fe5ba123c..70204f11e 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -195,7 +195,7 @@ void* PthreadThread::threadMain(void* arg) {
ProfilerRegisterThread();
#endif
- thread->state_ = starting;
+ thread->state_ = started;
thread->runnable()->run();
if (thread->state_ != stopping && thread->state_ != stopped) {
thread->state_ = stopping;
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index d4282adbc..a9e15af45 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -24,6 +24,10 @@
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
+#ifdef USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#endif
+
namespace apache { namespace thrift { namespace concurrency {
class Thread;
@@ -68,7 +72,11 @@ class Thread {
public:
+#ifdef USE_BOOST_THREAD
+ typedef boost::thread::id id_t;
+#else
typedef uint64_t id_t;
+#endif
virtual ~Thread() {};
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index 2d7976e17..d9066b5d3 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -19,7 +19,7 @@
#include <config.h>
#include <concurrency/Thread.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
@@ -60,7 +60,7 @@ public:
*/
bool helloWorldTest() {
- PosixThreadFactory threadFactory = PosixThreadFactory();
+ PlatformThreadFactory threadFactory = PlatformThreadFactory();
shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
@@ -105,7 +105,7 @@ public:
bool reapNThreads(int loop=1, int count=10) {
- PosixThreadFactory threadFactory = PosixThreadFactory();
+ PlatformThreadFactory threadFactory = PlatformThreadFactory();
Monitor* monitor = new Monitor();
@@ -203,7 +203,7 @@ public:
shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
- PosixThreadFactory threadFactory = PosixThreadFactory();
+ PlatformThreadFactory threadFactory = PlatformThreadFactory();
shared_ptr<Thread> thread = threadFactory.newThread(task);
@@ -307,7 +307,7 @@ public:
const size_t _id;
};
- void foo(PosixThreadFactory *tf) {
+ void foo(PlatformThreadFactory *tf) {
(void) tf;
}
@@ -317,7 +317,7 @@ public:
for(size_t lix = 0; lix < loop; lix++) {
- PosixThreadFactory threadFactory = PosixThreadFactory();
+ PlatformThreadFactory threadFactory = PlatformThreadFactory();
threadFactory.setDetached(true);
for(size_t tix = 0; tix < count; tix++) {
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index b6b5c3e47..e12201c0c 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -19,7 +19,7 @@
#include <config.h>
#include <concurrency/ThreadManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
@@ -110,10 +110,11 @@ public:
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
- shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+ shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
+#ifndef USE_BOOST_THREAD
threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-
+#endif
threadManager->threadFactory(threadFactory);
threadManager->start();
@@ -249,10 +250,11 @@ public:
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
- shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+ shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
+#ifndef USE_BOOST_THREAD
threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-
+#endif
threadManager->threadFactory(threadFactory);
threadManager->start();
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index b89074c07..41f1674d1 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -18,7 +18,7 @@
*/
#include <concurrency/TimerManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
@@ -100,7 +100,7 @@ class TimerManagerTests {
TimerManager timerManager;
- timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
+ timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
timerManager.start();
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index f40135c5d..6b816a4f6 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -19,12 +19,14 @@
#include "server/TThreadedServer.h"
#include "transport/TTransportException.h"
-#include "concurrency/PosixThreadFactory.h"
+#include <concurrency/PlatformThreadFactory.h>
#include <string>
#include <iostream>
-#include <pthread.h>
+
+#ifdef HAVE_UNISTD_H
#include <unistd.h>
+#endif
namespace apache { namespace thrift { namespace server {
@@ -123,7 +125,7 @@ void TThreadedServer::init() {
stop_ = false;
if (!threadFactory_) {
- threadFactory_.reset(new PosixThreadFactory);
+ threadFactory_.reset(new PlatformThreadFactory);
}
}
diff --git a/lib/cpp/src/transport/TFDTransport.cpp b/lib/cpp/src/transport/TFDTransport.cpp
index b1479fa5f..8a448fa2d 100644
--- a/lib/cpp/src/transport/TFDTransport.cpp
+++ b/lib/cpp/src/transport/TFDTransport.cpp
@@ -22,7 +22,9 @@
#include <transport/TFDTransport.h>
+#ifdef HAVE_UNISTD_H
#include <unistd.h>
+#endif
using namespace std;
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index c6c315502..405c162dc 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -47,12 +47,17 @@
#include <sys/stat.h>
#endif
+#ifdef _WIN32
+#include <io.h>
+#endif
+
namespace apache { namespace thrift { namespace transport {
using boost::scoped_ptr;
using boost::shared_ptr;
using namespace std;
using namespace apache::thrift::protocol;
+using namespace apache::thrift::concurrency;
#ifndef HAVE_CLOCK_GETTIME
@@ -102,13 +107,10 @@ TFileTransport::TFileTransport(string path, bool readOnly)
, lastBadChunk_(0)
, numCorruptedEventsInChunk_(0)
, readOnly_(readOnly)
+ , notFull_(&mutex_)
+ , notEmpty_(&mutex_)
+ , flushed_(&mutex_)
{
- // initialize all the condition vars/mutexes
- pthread_mutex_init(&mutex_, NULL);
- pthread_cond_init(&notFull_, NULL);
- pthread_cond_init(&notEmpty_, NULL);
- pthread_cond_init(&flushed_, NULL);
-
openLogFile();
}
@@ -142,16 +144,25 @@ void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
TFileTransport::~TFileTransport() {
// flush the buffer if a writer thread is active
+#ifdef USE_BOOST_THREAD
+ if(writerThreadId_.get()) {
+#else
if (writerThreadId_ > 0) {
+#endif
// set state to closing
closing_ = true;
// wake up the writer thread
// Since closing_ is true, it will attempt to flush all data, then exit.
- pthread_cond_signal(&notEmpty_);
+ notEmpty_.notify();
+#ifdef USE_BOOST_THREAD
+ writerThreadId_->join();
+ writerThreadId_.reset();
+#else
pthread_join(writerThreadId_, NULL);
writerThreadId_ = 0;
+#endif
}
if (dequeueBuffer_) {
@@ -191,12 +202,18 @@ bool TFileTransport::initBufferAndWriteThread() {
return false;
}
+#ifdef USE_BOOST_THREAD
+ if(!writerThreadId_.get()) {
+ writerThreadId_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(startWriterThread, (void *)this)));
+ }
+#else
if (writerThreadId_ == 0) {
if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
T_ERROR("%s", "Could not create writer thread");
return false;
}
}
+#endif
dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
@@ -242,20 +259,19 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
toEnqueue->eventSize_ = eventLen + 4;
// lock mutex
- pthread_mutex_lock(&mutex_);
+ Guard g(mutex_);
// make sure that enqueue buffer is initialized and writer thread is running
if (!bufferAndThreadInitialized_) {
if (!initBufferAndWriteThread()) {
delete toEnqueue;
- pthread_mutex_unlock(&mutex_);
return;
}
}
// Can't enqueue while buffer is full
while (enqueueBuffer_->isFull()) {
- pthread_cond_wait(&notFull_, &mutex_);
+ notFull_.wait();
}
// We shouldn't be trying to enqueue new data while a forced flush is
@@ -266,23 +282,21 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
// add to the buffer
if (!enqueueBuffer_->addEvent(toEnqueue)) {
delete toEnqueue;
- pthread_mutex_unlock(&mutex_);
return;
}
// signal anybody who's waiting for the buffer to be non-empty
- pthread_cond_signal(&notEmpty_);
+ notEmpty_.notify();
// this really should be a loop where it makes sure it got flushed
// because condition variables can get triggered by the os for no reason
// it is probably a non-factor for the time being
- pthread_mutex_unlock(&mutex_);
}
bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
- pthread_mutex_lock(&mutex_);
-
bool swap;
+ Guard g(mutex_);
+
if (!enqueueBuffer_->isEmpty()) {
swap = true;
} else if (closing_) {
@@ -292,10 +306,10 @@ bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
} else {
if (deadline != NULL) {
// if we were handed a deadline time struct, do a timed wait
- pthread_cond_timedwait(&notEmpty_, &mutex_, deadline);
+ notEmpty_.waitForTime(deadline);
} else {
// just wait until the buffer gets an item
- pthread_cond_wait(&notEmpty_, &mutex_);
+ notEmpty_.wait();
}
// could be empty if we timed out
@@ -308,11 +322,9 @@ bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
dequeueBuffer_ = temp;
}
- // unlock the mutex and signal if required
- pthread_mutex_unlock(&mutex_);
if (swap) {
- pthread_cond_signal(&notFull_);
+ notFull_.notify();
}
return swap;
@@ -340,7 +352,11 @@ void TFileTransport::writerThread() {
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
+#ifndef _WIN32
ftruncate(fd_, offset_);
+#else
+ _chsize_s(fd_, offset_);
+#endif
readState_.resetAllValues();
} catch (...) {
int errno_copy = errno;
@@ -358,12 +374,18 @@ void TFileTransport::writerThread() {
// this will only be true when the destructor is being invoked
if (closing_) {
if (hasIOError) {
- pthread_exit(NULL);
+#ifndef USE_BOOST_THREAD
+ pthread_exit(NULL);
+#else
+ return;
+#endif
}
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+#ifndef _WIN32
fsync(fd_);
+#endif
if (-1 == ::close(fd_)) {
int errno_copy = errno;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
@@ -371,8 +393,12 @@ void TFileTransport::writerThread() {
//fd successfully closed
fd_ = 0;
}
+#ifndef USE_BOOST_THREAD
pthread_exit(NULL);
- }
+#else
+ return;
+#endif
+ }
}
if (swapEventBuffers(&ts_next_flush)) {
@@ -387,7 +413,11 @@ void TFileTransport::writerThread() {
T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
usleep(writerThreadIOErrorSleepTime_);
if (closing_) {
+#ifndef USE_BOOST_THREAD
pthread_exit(NULL);
+#else
+ return;
+#endif
}
if (!fd_) {
::close(fd_);
@@ -467,7 +497,8 @@ void TFileTransport::writerThread() {
// time, it could have changed state in between. This will result in us
// making inconsistent decisions.
bool forced_flush = false;
- pthread_mutex_lock(&mutex_);
+ {
+ Guard g(mutex_);
if (forceFlush_) {
if (!enqueueBuffer_->isEmpty()) {
// If forceFlush_ is true, we need to flush all available data.
@@ -479,12 +510,11 @@ void TFileTransport::writerThread() {
// forceFlush_. Therefore the next time around the loop enqueueBuffer_
// is guaranteed to be empty. (I.e., we're guaranteed to make progress
// and clear forceFlush_ the next time around the loop.)
- pthread_mutex_unlock(&mutex_);
continue;
}
forced_flush = true;
- }
- pthread_mutex_unlock(&mutex_);
+ }
+ }
// determine if we need to perform an fsync
bool flush = false;
@@ -508,18 +538,19 @@ void TFileTransport::writerThread() {
if (flush) {
// sync (force flush) file to disk
+#ifndef _WIN32
fsync(fd_);
+#endif
unflushed = 0;
getNextFlushTime(&ts_next_flush);
// notify anybody waiting for flush completion
if (forced_flush) {
- pthread_mutex_lock(&mutex_);
+ Guard g(mutex_);
forceFlush_ = false;
assert(enqueueBuffer_->isEmpty());
assert(dequeueBuffer_->isEmpty());
- pthread_cond_broadcast(&flushed_);
- pthread_mutex_unlock(&mutex_);
+ flushed_.notifyAll();
}
}
}
@@ -527,22 +558,26 @@ void TFileTransport::writerThread() {
void TFileTransport::flush() {
// file must be open for writing for any flushing to take place
+#ifdef USE_BOOST_THREAD
+ if (!writerThreadId_.get()) {
+ return;
+ }
+#else
if (writerThreadId_ <= 0) {
return;
}
+#endif
// wait for flush to take place
- pthread_mutex_lock(&mutex_);
+ Guard g(mutex_);
// Indicate that we are requesting a flush
forceFlush_ = true;
// Wake up the writer thread so it will perform the flush immediately
- pthread_cond_signal(&notEmpty_);
+ notEmpty_.notify();
while (forceFlush_) {
- pthread_cond_wait(&flushed_, &mutex_);
+ flushed_.wait();
}
-
- pthread_mutex_unlock(&mutex_);
}
@@ -892,9 +927,15 @@ uint32_t TFileTransport::getCurChunk() {
// Utility Functions
void TFileTransport::openLogFile() {
+#ifndef _WIN32
mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
fd_ = ::open(filename_.c_str(), flags, mode);
+#else
+ int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
+ int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
+ fd_ = ::_open(filename_.c_str(), flags, mode);
+#endif
offset_ = 0;
// make sure open call was successful
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 2ea8c9af2..b0e48d1b2 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -27,15 +27,26 @@
#include <string>
#include <stdio.h>
+#ifdef HAVE_PTHREAD_H
#include <pthread.h>
+#endif
+
+#ifdef USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#endif
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include "concurrency/Mutex.h"
+#include "concurrency/Monitor.h"
+
namespace apache { namespace thrift { namespace transport {
using apache::thrift::TProcessor;
using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Monitor;
// Data pertaining to a single event
typedef struct eventInfo {
@@ -360,7 +371,11 @@ class TFileTransport : public TFileReaderTransport,
static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
// writer thread id
- pthread_t writerThreadId_;
+#ifdef USE_BOOST_THREAD
+ std::auto_ptr<boost::thread> writerThreadId_;
+#else
+ pthread_t writerThreadId_;
+#endif
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
// needs to be written to the file. The buffers are swapped by the writer thread.
@@ -368,15 +383,15 @@ class TFileTransport : public TFileReaderTransport,
TFileTransportBuffer *enqueueBuffer_;
// conditions used to block when the buffer is full or empty
- pthread_cond_t notFull_, notEmpty_;
+ Monitor notFull_, notEmpty_;
volatile bool closing_;
// To keep track of whether the buffer has been flushed
- pthread_cond_t flushed_;
+ Monitor flushed_;
volatile bool forceFlush_;
// Mutex that is grabbed when enqueueing and swapping the read/write buffers
- pthread_mutex_t mutex_;
+ Mutex mutex_;
// File information
std::string filename_;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 2db8f8b70..a0cc77a39 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -496,6 +496,12 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
}
#endif
+#ifdef _WIN32
+ if(errno_copy == WSAECONNRESET) {
+ return 0; // EOF
+ }
+#endif
+
// Now it's not a try again case, but a real probblez
GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
diff --git a/lib/cpp/src/windows/config.h b/lib/cpp/src/windows/config.h
index 0f9a30438..2db25967d 100644
--- a/lib/cpp/src/windows/config.h
+++ b/lib/cpp/src/windows/config.h
@@ -32,6 +32,7 @@
#pragma warning(disable: 4250) // Inherits via dominance.
#define HAVE_GETTIMEOFDAY 1
+#define HAVE_SYS_STAT_H 1
#include "TargetVersion.h"
#include "GetTimeOfDay.h"
@@ -53,13 +54,25 @@ typedef boost::uint8_t uint8_t;
#pragma comment(lib, "Ws2_32.lib")
// pthreads
-#include <pthread.h>
+#if 0
+# include <pthread.h>
+#else
+struct timespec {
+ long tv_sec;
+ long tv_nsec;
+};
+# define USE_BOOST_THREAD 1
+# define ctime_r( _clock, _buf ) \
+ ( strcpy( (_buf), ctime( (_clock) ) ), \
+ (_buf) )
+#endif
typedef ptrdiff_t ssize_t;
// Missing functions.
#define usleep(ms) Sleep(ms)
+#if WINVER <= 0x0502
#define poll(fds, nfds, timeout) \
poll_win32(fds, nfds, timeout)
@@ -80,6 +93,10 @@ inline int poll_win32(LPWSAPOLLFD fdArray, ULONG fds, INT timeout)
timeval time_out = {timeout * 0.001, timeout * 1000};
return select(1, &read_fds, &write_fds, &except_fds, &time_out);
}
+#else
+# define poll(fds, nfds, timeout) \
+ WSAPoll(fds, nfds, timeout)
+#endif // WINVER
inline void close(SOCKET socket)
{
diff --git a/test/cpp/Makefile.am b/test/cpp/Makefile.am
index 6c62cfbcf..95574fc56 100755
--- a/test/cpp/Makefile.am
+++ b/test/cpp/Makefile.am
@@ -95,9 +95,9 @@ gen-cpp/ThriftTest.cpp gen-cpp/StressTest_types.cpp gen-cpp/StressTest_constants
INCLUDES = \
-I$(top_srcdir)/lib/cpp/src -Igen-cpp
-AM_CPPFLAGS = $(BOOST_CPPFLAGS)
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(LIBEVENT_CPPFLAGS)
AM_CXXFLAGS = -Wall
-AM_LDFLAGS = $(BOOST_LDFLAGS)
+AM_LDFLAGS = $(BOOST_LDFLAGS) $(LIBEVENT_LDFLAGS)
clean-local:
$(RM) -r gen-cpp
@@ -109,5 +109,3 @@ EXTRA_DIST = \
src/StressTestNonBlocking.cpp \
realloc/realloc_test.c \
realloc/Makefile
-
-
diff --git a/test/cpp/src/StressTest.cpp b/test/cpp/src/StressTest.cpp
index 4892722d2..339e7d131 100755
--- a/test/cpp/src/StressTest.cpp
+++ b/test/cpp/src/StressTest.cpp
@@ -18,7 +18,7 @@
*/
#include <concurrency/ThreadManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
#include <concurrency/Mutex.h>
@@ -326,7 +326,7 @@ int main(int argc, char **argv) {
cerr << usage;
}
- shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+ shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
// Dispatcher
shared_ptr<Server> serviceHandler(new Server());
diff --git a/test/cpp/src/StressTestNonBlocking.cpp b/test/cpp/src/StressTestNonBlocking.cpp
index 0d8bc3ab9..2ff507b75 100755
--- a/test/cpp/src/StressTestNonBlocking.cpp
+++ b/test/cpp/src/StressTestNonBlocking.cpp
@@ -18,7 +18,7 @@
*/
#include <concurrency/ThreadManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Monitor.h>
#include <concurrency/Util.h>
#include <concurrency/Mutex.h>
@@ -321,7 +321,7 @@ int main(int argc, char **argv) {
cerr << usage;
}
- shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+ shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
// Dispatcher
shared_ptr<Server> serviceHandler(new Server());
diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp
index 9aed55105..c1d6e0700 100755
--- a/test/cpp/src/TestClient.cpp
+++ b/test/cpp/src/TestClient.cpp
@@ -46,8 +46,6 @@ using namespace apache::thrift::transport;
using namespace thrift::test;
using namespace apache::thrift::async;
-using std::tr1::placeholders::_1;
-
//extern uint32_t g_socket_syscalls;
// Current time, microseconds since the epoch
@@ -86,7 +84,7 @@ static void testVoid_clientReturn(const char* host, int port, event_base *base,
delete client;
shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel(host, "/", host, port, base));
client = new ThriftTestCobClient(channel, protocolFactory);
- client->testString(tr1::bind(testString_clientReturn, host, port, base, protocolFactory, _1), "Test");
+ client->testString(tr1::bind(testString_clientReturn, host, port, base, protocolFactory, std::tr1::placeholders::_1), "Test");
} catch (TException& exn) {
cout << "Error: " << exn.what() << endl;
}
@@ -211,7 +209,7 @@ int main(int argc, char** argv) {
shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel(host.c_str(), "/", host.c_str(), port, base));
ThriftTestCobClient* client = new ThriftTestCobClient(channel, protocolFactory.get());
- client->testVoid(tr1::bind(testVoid_clientReturn, host.c_str(), port, base, protocolFactory.get(), _1));
+ client->testVoid(tr1::bind(testVoid_clientReturn, host.c_str(), port, base, protocolFactory.get(), std::tr1::placeholders::_1));
event_base_loop(base, 0);
return 0;
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 4afbef68a..456577f8d 100755
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -21,7 +21,7 @@
#include <inttypes.h>
#include <concurrency/ThreadManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <protocol/TJSONProtocol.h>
#include <server/TSimpleServer.h>
@@ -641,8 +641,8 @@ int main(int argc, char **argv) {
shared_ptr<ThreadManager> threadManager =
ThreadManager::newSimpleThreadManager(workers);
- shared_ptr<PosixThreadFactory> threadFactory =
- shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+ shared_ptr<PlatformThreadFactory> threadFactory =
+ shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
threadManager->threadFactory(threadFactory);
diff --git a/test/threads/ThreadsClient.cpp b/test/threads/ThreadsClient.cpp
index 85274a632..f5c076f2e 100644
--- a/test/threads/ThreadsClient.cpp
+++ b/test/threads/ThreadsClient.cpp
@@ -25,9 +25,9 @@
#include <server/TThreadPoolServer.h>
#include <transport/TSocket.h>
#include <transport/TTransportUtils.h>
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/ThreadManager.h>
-#include <thrift/concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PlatformThreadFactory.h>
using boost::shared_ptr;
using namespace apache::thrift;
diff --git a/test/threads/ThreadsServer.cpp b/test/threads/ThreadsServer.cpp
index 8734ee893..8420c2f06 100644
--- a/test/threads/ThreadsServer.cpp
+++ b/test/threads/ThreadsServer.cpp
@@ -26,9 +26,9 @@
#include <server/TThreadedServer.h>
#include <transport/TServerSocket.h>
#include <transport/TTransportUtils.h>
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/ThreadManager.h>
-#include <thrift/concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PlatformThreadFactory.h>
using boost::shared_ptr;
using namespace apache::thrift;
@@ -111,8 +111,8 @@ int main(int argc, char **argv) {
/*
shared_ptr<ThreadManager> threadManager =
ThreadManager::newSimpleThreadManager(10);
- shared_ptr<PosixThreadFactory> threadFactory =
- shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+ shared_ptr<PlatformThreadFactory> threadFactory =
+ shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();