/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "PosixThreadFactory.h" #include "Exception.h" #if GOOGLE_PERFTOOLS_REGISTER_THREAD # include #endif #include #include #include #include namespace apache { namespace thrift { namespace concurrency { using boost::shared_ptr; using boost::weak_ptr; /** * The POSIX thread class. * * @version $Id:$ */ class PthreadThread: public Thread { public: enum STATE { uninitialized, starting, started, stopping, stopped }; static const int MB = 1024 * 1024; static void* threadMain(void* arg); private: pthread_t pthread_; STATE state_; int policy_; int priority_; int stackSize_; weak_ptr self_; bool detached_; public: PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr runnable) : pthread_(0), state_(uninitialized), policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) { this->Thread::runnable(runnable); } ~PthreadThread() { /* Nothing references this thread, if is is not detached, do a join now, otherwise the thread-id and, possibly, other resources will be leaked. */ if(!detached_) { try { join(); } catch(...) { // We're really hosed. } } } void start() { if (state_ != uninitialized) { return; } pthread_attr_t thread_attr; if (pthread_attr_init(&thread_attr) != 0) { throw SystemResourceException("pthread_attr_init failed"); } if(pthread_attr_setdetachstate(&thread_attr, detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE) != 0) { throw SystemResourceException("pthread_attr_setdetachstate failed"); } // Set thread stack size if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) { throw SystemResourceException("pthread_attr_setstacksize failed"); } // Set thread policy if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) { throw SystemResourceException("pthread_attr_setschedpolicy failed"); } struct sched_param sched_param; sched_param.sched_priority = priority_; // Set thread priority if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) { throw SystemResourceException("pthread_attr_setschedparam failed"); } // Create reference shared_ptr* selfRef = new shared_ptr(); *selfRef = self_.lock(); state_ = starting; if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) { throw SystemResourceException("pthread_create failed"); } } void join() { if (!detached_ && state_ != uninitialized) { void* ignore; /* XXX If join fails it is most likely due to the fact that the last reference was the thread itself and cannot join. This results in leaked threads and will eventually cause the process to run out of thread resources. We're beyond the point of throwing an exception. Not clear how best to handle this. */ detached_ = pthread_join(pthread_, &ignore) == 0; } } Thread::id_t getId() { return (Thread::id_t)pthread_; } shared_ptr runnable() const { return Thread::runnable(); } void runnable(shared_ptr value) { Thread::runnable(value); } void weakRef(shared_ptr self) { assert(self.get() == this); self_ = weak_ptr(self); } }; void* PthreadThread::threadMain(void* arg) { shared_ptr thread = *(shared_ptr*)arg; delete reinterpret_cast*>(arg); if (thread == NULL) { return (void*)0; } if (thread->state_ != starting) { return (void*)0; } #if GOOGLE_PERFTOOLS_REGISTER_THREAD ProfilerRegisterThread(); #endif thread->state_ = starting; thread->runnable()->run(); if (thread->state_ != stopping && thread->state_ != stopped) { thread->state_ = stopping; } return (void*)0; } /** * POSIX Thread factory implementation */ class PosixThreadFactory::Impl { private: POLICY policy_; PRIORITY priority_; int stackSize_; bool detached_; /** * Converts generic posix thread schedule policy enums into pthread * API values. */ static int toPthreadPolicy(POLICY policy) { switch (policy) { case OTHER: return SCHED_OTHER; case FIFO: return SCHED_FIFO; case ROUND_ROBIN: return SCHED_RR; } return SCHED_OTHER; } /** * Converts relative thread priorities to absolute value based on posix * thread scheduler policy * * The idea is simply to divide up the priority range for the given policy * into the correpsonding relative priority level (lowest..highest) and * then pro-rate accordingly. */ static int toPthreadPriority(POLICY policy, PRIORITY priority) { int pthread_policy = toPthreadPolicy(policy); int min_priority = sched_get_priority_min(pthread_policy); int max_priority = sched_get_priority_max(pthread_policy); int quanta = (HIGHEST - LOWEST) + 1; float stepsperquanta = (max_priority - min_priority) / quanta; if (priority <= HIGHEST) { return (int)(min_priority + stepsperquanta * priority); } else { // should never get here for priority increments. assert(false); return (int)(min_priority + stepsperquanta * NORMAL); } } public: Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {} /** * Creates a new POSIX thread to run the runnable object * * @param runnable A runnable object */ shared_ptr newThread(shared_ptr runnable) const { shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable)); result->weakRef(result); runnable->thread(result); return result; } int getStackSize() const { return stackSize_; } void setStackSize(int value) { stackSize_ = value; } PRIORITY getPriority() const { return priority_; } /** * Sets priority. * * XXX * Need to handle incremental priorities properly. */ void setPriority(PRIORITY value) { priority_ = value; } bool isDetached() const { return detached_; } void setDetached(bool value) { detached_ = value; } Thread::id_t getCurrentThreadId() const { // TODO(dreiss): Stop using C-style casts. return (id_t)pthread_self(); } }; PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const { return impl_->newThread(runnable); } int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); } void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); } PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); } void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); } bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); } void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); } Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); } }}} // apache::thrift::concurrency