summaryrefslogtreecommitdiff
path: root/libstdc++-v3/include/parallel/workstealing.h
diff options
context:
space:
mode:
Diffstat (limited to 'libstdc++-v3/include/parallel/workstealing.h')
-rw-r--r--libstdc++-v3/include/parallel/workstealing.h435
1 files changed, 219 insertions, 216 deletions
diff --git a/libstdc++-v3/include/parallel/workstealing.h b/libstdc++-v3/include/parallel/workstealing.h
index 638057ca740..9e0db3a9584 100644
--- a/libstdc++-v3/include/parallel/workstealing.h
+++ b/libstdc++-v3/include/parallel/workstealing.h
@@ -49,261 +49,264 @@ namespace __gnu_parallel
#define _GLIBCXX_JOB_VOLATILE volatile
-/** @brief One __job for a certain thread. */
-template<typename _DifferenceTp>
- struct _Job
- {
- typedef _DifferenceTp _DifferenceType;
-
- /** @brief First element.
- *
- * Changed by owning and stealing thread. By stealing thread,
- * always incremented. */
- _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first;
-
- /** @brief Last element.
- *
- * Changed by owning thread only. */
- _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last;
-
- /** @brief Number of elements, i.e. @__c _M_last-_M_first+1.
- *
- * Changed by owning thread only. */
- _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load;
- };
-
-/** @brief Work stealing algorithm for random access iterators.
- *
- * Uses O(1) additional memory. Synchronization at job lists is
- * done with atomic operations.
- * @param __begin Begin iterator of element sequence.
- * @param __end End iterator of element sequence.
- * @param __op User-supplied functor (comparator, predicate, adding
- * functor, ...).
- * @param __f Functor to "process" an element with __op (depends on
- * desired functionality, e. g. for std::for_each(), ...).
- * @param __r Functor to "add" a single __result to the already
- * processed elements (depends on functionality).
- * @param __base Base value for reduction.
- * @param __output Pointer to position where final result is written to
- * @param __bound Maximum number of elements processed (e. g. for
- * std::count_n()).
- * @return User-supplied functor (that may contain a part of the result).
- */
-template<typename _RAIter,
- typename _Op,
- typename _Fu,
- typename _Red,
- typename _Result>
- _Op
- __for_each_template_random_access_workstealing(
- _RAIter __begin, _RAIter __end, _Op __op, _Fu& __f, _Red __r,
- _Result __base, _Result& __output,
- typename std::iterator_traits<_RAIter>::difference_type __bound)
- {
- _GLIBCXX_CALL(__end - __begin)
-
- typedef std::iterator_traits<_RAIter> _TraitsType;
- typedef typename _TraitsType::difference_type _DifferenceType;
-
- const _Settings& __s = _Settings::get();
-
- _DifferenceType __chunk_size =
- static_cast<_DifferenceType>(__s.workstealing_chunk_size);
-
- // How many jobs?
- _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound;
-
- // To avoid false sharing in a cache line.
- const int __stride =
- __s.cache_line_size * 10 / sizeof(_Job<_DifferenceType>) + 1;
-
- // Total number of threads currently working.
- _ThreadIndex __busy = 0;
-
- _Job<_DifferenceType> *__job;
-
- omp_lock_t __output_lock;
- omp_init_lock(&__output_lock);
-
- // Write base value to output.
- __output = __base;
-
- // No more threads than jobs, at least one thread.
- _ThreadIndex __num_threads =
- __gnu_parallel::max<_ThreadIndex>(1,
- __gnu_parallel::min<_DifferenceType>(__length, __get_max_threads()));
-
-# pragma omp parallel shared(__busy) num_threads(__num_threads)
+ /** @brief One __job for a certain thread. */
+ template<typename _DifferenceTp>
+ struct _Job
+ {
+ typedef _DifferenceTp _DifferenceType;
+
+ /** @brief First element.
+ *
+ * Changed by owning and stealing thread. By stealing thread,
+ * always incremented. */
+ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first;
+
+ /** @brief Last element.
+ *
+ * Changed by owning thread only. */
+ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last;
+
+ /** @brief Number of elements, i.e. @c _M_last-_M_first+1.
+ *
+ * Changed by owning thread only. */
+ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load;
+ };
+
+ /** @brief Work stealing algorithm for random access iterators.
+ *
+ * Uses O(1) additional memory. Synchronization at job lists is
+ * done with atomic operations.
+ * @param __begin Begin iterator of element sequence.
+ * @param __end End iterator of element sequence.
+ * @param __op User-supplied functor (comparator, predicate, adding
+ * functor, ...).
+ * @param __f Functor to "process" an element with __op (depends on
+ * desired functionality, e. g. for std::for_each(), ...).
+ * @param __r Functor to "add" a single __result to the already
+ * processed elements (depends on functionality).
+ * @param __base Base value for reduction.
+ * @param __output Pointer to position where final result is written to
+ * @param __bound Maximum number of elements processed (e. g. for
+ * std::count_n()).
+ * @return User-supplied functor (that may contain a part of the result).
+ */
+ template<typename _RAIter,
+ typename _Op,
+ typename _Fu,
+ typename _Red,
+ typename _Result>
+ _Op
+ __for_each_template_random_access_workstealing(_RAIter __begin,
+ _RAIter __end, _Op __op,
+ _Fu& __f, _Red __r,
+ _Result __base,
+ _Result& __output,
+ typename std::iterator_traits<_RAIter>::difference_type __bound)
+ {
+ _GLIBCXX_CALL(__end - __begin)
+
+ typedef std::iterator_traits<_RAIter> _TraitsType;
+ typedef typename _TraitsType::difference_type _DifferenceType;
+
+ const _Settings& __s = _Settings::get();
+
+ _DifferenceType __chunk_size =
+ static_cast<_DifferenceType>(__s.workstealing_chunk_size);
+
+ // How many jobs?
+ _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound;
+
+ // To avoid false sharing in a cache line.
+ const int __stride = (__s.cache_line_size * 10
+ / sizeof(_Job<_DifferenceType>) + 1);
+
+ // Total number of threads currently working.
+ _ThreadIndex __busy = 0;
+
+ _Job<_DifferenceType> *__job;
+
+ omp_lock_t __output_lock;
+ omp_init_lock(&__output_lock);
+
+ // Write base value to output.
+ __output = __base;
+
+ // No more threads than jobs, at least one thread.
+ _ThreadIndex __num_threads = __gnu_parallel::max<_ThreadIndex>
+ (1, __gnu_parallel::min<_DifferenceType>(__length,
+ __get_max_threads()));
+
+# pragma omp parallel shared(__busy) num_threads(__num_threads)
{
-
# pragma omp single
- {
- __num_threads = omp_get_num_threads();
+ {
+ __num_threads = omp_get_num_threads();
- // Create job description array.
- __job = new _Job<_DifferenceType>[__num_threads * __stride];
- }
+ // Create job description array.
+ __job = new _Job<_DifferenceType>[__num_threads * __stride];
+ }
- // Initialization phase.
+ // Initialization phase.
- // Flags for every thread if it is doing productive work.
- bool __iam_working = false;
+ // Flags for every thread if it is doing productive work.
+ bool __iam_working = false;
- // Thread id.
- _ThreadIndex __iam = omp_get_thread_num();
+ // Thread id.
+ _ThreadIndex __iam = omp_get_thread_num();
- // This job.
- _Job<_DifferenceType>& __my_job = __job[__iam * __stride];
+ // This job.
+ _Job<_DifferenceType>& __my_job = __job[__iam * __stride];
- // Random number (for work stealing).
- _ThreadIndex __victim;
+ // Random number (for work stealing).
+ _ThreadIndex __victim;
- // Local value for reduction.
- _Result __result = _Result();
+ // Local value for reduction.
+ _Result __result = _Result();
- // Number of elements to steal in one attempt.
- _DifferenceType __steal;
+ // Number of elements to steal in one attempt.
+ _DifferenceType __steal;
- // Every thread has its own random number generator
- // (modulo __num_threads).
- _RandomNumber rand_gen(__iam, __num_threads);
+ // Every thread has its own random number generator
+ // (modulo __num_threads).
+ _RandomNumber __rand_gen(__iam, __num_threads);
- // This thread is currently working.
+ // This thread is currently working.
# pragma omp atomic
- ++__busy;
+ ++__busy;
- __iam_working = true;
+ __iam_working = true;
- // How many jobs per thread? last thread gets the rest.
- __my_job._M_first =
- static_cast<_DifferenceType>(__iam * (__length / __num_threads));
+ // How many jobs per thread? last thread gets the rest.
+ __my_job._M_first = static_cast<_DifferenceType>
+ (__iam * (__length / __num_threads));
- __my_job._M_last = (__iam == (__num_threads - 1)) ?
- (__length - 1) : ((__iam + 1) * (__length / __num_threads) - 1);
- __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
+ __my_job._M_last = (__iam == (__num_threads - 1)
+ ? (__length - 1)
+ : ((__iam + 1) * (__length / __num_threads) - 1));
+ __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
- // Init result with _M_first value (to have a base value for reduction)
- if (__my_job._M_first <= __my_job._M_last)
- {
- // Cannot use volatile variable directly.
- _DifferenceType __my_first = __my_job._M_first;
- __result = __f(__op, __begin + __my_first);
- ++__my_job._M_first;
- --__my_job._M_load;
- }
+ // Init result with _M_first value (to have a base value for reduction)
+ if (__my_job._M_first <= __my_job._M_last)
+ {
+ // Cannot use volatile variable directly.
+ _DifferenceType __my_first = __my_job._M_first;
+ __result = __f(__op, __begin + __my_first);
+ ++__my_job._M_first;
+ --__my_job._M_load;
+ }
- _RAIter __current;
+ _RAIter __current;
# pragma omp barrier
- // Actual work phase
- // Work on own or stolen current start
- while (__busy > 0)
- {
- // Work until no productive thread left.
+ // Actual work phase
+ // Work on own or stolen current start
+ while (__busy > 0)
+ {
+ // Work until no productive thread left.
# pragma omp flush(__busy)
- // Thread has own work to do
- while (__my_job._M_first <= __my_job._M_last)
- {
- // fetch-and-add call
- // Reserve current job block (size __chunk_size) in my queue.
- _DifferenceType __current_job =
- __fetch_and_add<_DifferenceType>(
- &(__my_job._M_first), __chunk_size);
-
- // Update _M_load, to make the three values consistent,
- // _M_first might have been changed in the meantime
- __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
- for (_DifferenceType __job_counter = 0;
- __job_counter < __chunk_size
- && __current_job <= __my_job._M_last;
- ++__job_counter)
- {
- // Yes: process it!
- __current = __begin + __current_job;
- ++__current_job;
-
- // Do actual work.
- __result = __r(__result, __f(__op, __current));
- }
+ // Thread has own work to do
+ while (__my_job._M_first <= __my_job._M_last)
+ {
+ // fetch-and-add call
+ // Reserve current job block (size __chunk_size) in my queue.
+ _DifferenceType __current_job =
+ __fetch_and_add<_DifferenceType>(&(__my_job._M_first),
+ __chunk_size);
+
+ // Update _M_load, to make the three values consistent,
+ // _M_first might have been changed in the meantime
+ __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
+ for (_DifferenceType __job_counter = 0;
+ __job_counter < __chunk_size
+ && __current_job <= __my_job._M_last;
+ ++__job_counter)
+ {
+ // Yes: process it!
+ __current = __begin + __current_job;
+ ++__current_job;
+
+ // Do actual work.
+ __result = __r(__result, __f(__op, __current));
+ }
# pragma omp flush(__busy)
- }
+ }
- // After reaching this point, a thread's __job list is empty.
- if (__iam_working)
- {
- // This thread no longer has work.
+ // After reaching this point, a thread's __job list is empty.
+ if (__iam_working)
+ {
+ // This thread no longer has work.
# pragma omp atomic
- --__busy;
+ --__busy;
- __iam_working = false;
- }
+ __iam_working = false;
+ }
- _DifferenceType __supposed_first, __supposed_last, __supposed_load;
- do
- {
- // Find random nonempty deque (not own), do consistency check.
- __yield();
+ _DifferenceType __supposed_first, __supposed_last,
+ __supposed_load;
+ do
+ {
+ // Find random nonempty deque (not own), do consistency check.
+ __yield();
# pragma omp flush(__busy)
- __victim = rand_gen();
- __supposed_first = __job[__victim * __stride]._M_first;
- __supposed_last = __job[__victim * __stride]._M_last;
- __supposed_load = __job[__victim * __stride]._M_load;
- }
- while (__busy > 0
- && ((__supposed_load <= 0)
- || ((__supposed_first + __supposed_load - 1)
- != __supposed_last)));
-
- if (__busy == 0)
- break;
-
- if (__supposed_load > 0)
- {
- // Has work and work to do.
- // Number of elements to steal (at least one).
- __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2;
-
- // Push __victim's current start forward.
- _DifferenceType __stolen_first =
- __fetch_and_add<_DifferenceType>(
- &(__job[__victim * __stride]._M_first), __steal);
- _DifferenceType __stolen_try =
- __stolen_first + __steal - _DifferenceType(1);
-
- __my_job._M_first = __stolen_first;
- __my_job._M_last =
- __gnu_parallel::min(__stolen_try, __supposed_last);
- __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
-
- // Has potential work again.
+ __victim = __rand_gen();
+ __supposed_first = __job[__victim * __stride]._M_first;
+ __supposed_last = __job[__victim * __stride]._M_last;
+ __supposed_load = __job[__victim * __stride]._M_load;
+ }
+ while (__busy > 0
+ && ((__supposed_load <= 0)
+ || ((__supposed_first + __supposed_load - 1)
+ != __supposed_last)));
+
+ if (__busy == 0)
+ break;
+
+ if (__supposed_load > 0)
+ {
+ // Has work and work to do.
+ // Number of elements to steal (at least one).
+ __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2;
+
+ // Push __victim's current start forward.
+ _DifferenceType __stolen_first =
+ __fetch_and_add<_DifferenceType>
+ (&(__job[__victim * __stride]._M_first), __steal);
+ _DifferenceType __stolen_try = (__stolen_first + __steal
+ - _DifferenceType(1));
+
+ __my_job._M_first = __stolen_first;
+ __my_job._M_last = __gnu_parallel::min(__stolen_try,
+ __supposed_last);
+ __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
+
+ // Has potential work again.
# pragma omp atomic
- ++__busy;
- __iam_working = true;
+ ++__busy;
+ __iam_working = true;
# pragma omp flush(__busy)
- }
+ }
# pragma omp flush(__busy)
- } // end while __busy > 0
- // Add accumulated result to output.
- omp_set_lock(&__output_lock);
- __output = __r(__output, __result);
- omp_unset_lock(&__output_lock);
+ } // end while __busy > 0
+ // Add accumulated result to output.
+ omp_set_lock(&__output_lock);
+ __output = __r(__output, __result);
+ omp_unset_lock(&__output_lock);
}
- delete[] __job;
+ delete[] __job;
- // Points to last element processed (needed as return value for
- // some algorithms like transform)
- __f._M_finish_iterator = __begin + __length;
+ // Points to last element processed (needed as return value for
+ // some algorithms like transform)
+ __f._M_finish_iterator = __begin + __length;
- omp_destroy_lock(&__output_lock);
+ omp_destroy_lock(&__output_lock);
- return __op;
- }
+ return __op;
+ }
} // end namespace
#endif /* _GLIBCXX_PARALLEL_WORKSTEALING_H */