// Copyright 2019 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ #include #include #include #include #include #include "base/base_export.h" #include "base/callback.h" #include "base/synchronization/condition_variable.h" #include "base/task/common/checked_lock.h" #include "base/task/post_job.h" #include "base/task/task_traits.h" #include "base/task/thread_pool/task.h" #include "base/task/thread_pool/task_source.h" #include "base/task/thread_pool/task_source_sort_key.h" namespace base { namespace internal { class PooledTaskRunnerDelegate; // A JobTaskSource generates many Tasks from a single RepeatingClosure. // // Derived classes control the intended concurrency with GetMaxConcurrency(). class BASE_EXPORT JobTaskSource : public TaskSource { public: JobTaskSource(const Location& from_here, const TaskTraits& traits, RepeatingCallback worker_task, MaxConcurrencyCallback max_concurrency_callback, PooledTaskRunnerDelegate* delegate); JobTaskSource(const JobTaskSource&) = delete; JobTaskSource& operator=(const JobTaskSource&) = delete; static JobHandle CreateJobHandle( scoped_refptr task_source) { return JobHandle(std::move(task_source)); } // Notifies this task source that max concurrency was increased, and the // number of worker should be adjusted. void NotifyConcurrencyIncrease(); // Informs this JobTaskSource that the current thread would like to join and // contribute to running |worker_task|. Returns true if the joining thread can // contribute (RunJoinTask() can be called), or false if joining was completed // and all other workers returned because either there's no work remaining or // Job was cancelled. bool WillJoin(); // Contributes to running |worker_task| and returns true if the joining thread // can contribute again (RunJoinTask() can be called again), or false if // joining was completed and all other workers returned because either there's // no work remaining or Job was cancelled. This should be called only after // WillJoin() or RunJoinTask() previously returned true. bool RunJoinTask(); // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() // to return RunStatus::kDisallowed. void Cancel(TaskSource::Transaction* transaction = nullptr); // TaskSource: ExecutionEnvironment GetExecutionEnvironment() override; size_t GetRemainingConcurrency() const override; TaskSourceSortKey GetSortKey( bool disable_fair_scheduling = false) const override; bool IsActive() const; size_t GetWorkerCount() const; // Returns the maximum number of tasks from this TaskSource that can run // concurrently. size_t GetMaxConcurrency() const; uint8_t AcquireTaskId(); void ReleaseTaskId(uint8_t task_id); // Returns true if a worker should return from the worker task on the current // thread ASAP. bool ShouldYield(); PooledTaskRunnerDelegate* delegate() const { return delegate_; } private: // Atomic internal state to track the number of workers running a task from // this JobTaskSource and whether this JobTaskSource is canceled. All // operations are performed with std::memory_order_relaxed as State is only // ever modified under a lock or read atomically (optimistic read). class State { public: static constexpr size_t kCanceledMask = 1; static constexpr size_t kWorkerCountBitOffset = 1; static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset; struct Value { size_t worker_count() const { return value >> kWorkerCountBitOffset; } // Returns true if canceled. bool is_canceled() const { return value & kCanceledMask; } uint32_t value; }; State(); ~State(); // Sets as canceled. Returns the state // before the operation. Value Cancel(); // Increments the worker count by 1. Returns the state before the operation. Value IncrementWorkerCount(); // Decrements the worker count by 1. Returns the state before the operation. Value DecrementWorkerCount(); // Loads and returns the state. Value Load() const; private: std::atomic value_{0}; }; // Atomic flag that indicates if the joining thread is currently waiting on // another worker to yield or to signal. class JoinFlag { public: static constexpr uint32_t kNotWaiting = 0; static constexpr uint32_t kWaitingForWorkerToSignal = 1; static constexpr uint32_t kWaitingForWorkerToYield = 3; // kWaitingForWorkerToYield is 3 because the impl relies on the following // property. static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) == kWaitingForWorkerToSignal, ""); JoinFlag(); ~JoinFlag(); // Returns true if the status is not kNotWaiting, using // std::memory_order_relaxed. bool IsWaiting() { return value_.load(std::memory_order_relaxed) != kNotWaiting; } // Resets the status as kNotWaiting using std::memory_order_relaxed. void Reset(); // Sets the status as kWaitingForWorkerToYield using // std::memory_order_relaxed. void SetWaiting(); // If the flag is kWaitingForWorkerToYield, returns true indicating that the // worker should yield, and atomically updates to kWaitingForWorkerToSignal // (using std::memory_order_relaxed) to ensure that a single worker yields // in response to SetWaiting(). bool ShouldWorkerYield(); // If the flag is kWaiting*, returns true indicating that the worker should // signal, and atomically updates to kNotWaiting (using // std::memory_order_relaxed) to ensure that a single worker signals in // response to SetWaiting(). bool ShouldWorkerSignal(); private: std::atomic value_{kNotWaiting}; }; ~JobTaskSource() override; // Called from the joining thread. Waits for the worker count to be below or // equal to max concurrency (will happen when a worker calls // DidProcessTask()). Returns true if the joining thread should run a task, or // false if joining was completed and all other workers returned because // either there's no work remaining or Job was cancelled. bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_); size_t GetMaxConcurrency(size_t worker_count) const; // TaskSource: RunStatus WillRunTask() override; Task TakeTask(TaskSource::Transaction* transaction) override; Task Clear(TaskSource::Transaction* transaction) override; bool DidProcessTask(TaskSource::Transaction* transaction) override; // Synchronizes access to workers state. mutable CheckedLock worker_lock_{UniversalSuccessor()}; // Current atomic state (atomic despite the lock to allow optimistic reads // and cancellation without the lock). State state_ GUARDED_BY(worker_lock_); // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() // hence the use of atomics. JoinFlag join_flag_ GUARDED_BY(worker_lock_); // Signaled when |join_flag_| is kWaiting* and a worker returns. std::unique_ptr worker_released_condition_ GUARDED_BY(worker_lock_); std::atomic assigned_task_ids_{0}; const Location from_here_; RepeatingCallback max_concurrency_callback_; // Worker task set by the job owner. RepeatingCallback worker_task_; // Task returned from TakeTask(), that calls |worker_task_| internally. RepeatingClosure primary_task_; const TimeTicks ready_time_; PooledTaskRunnerDelegate* delegate_; }; } // namespace internal } // namespace base #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_