diff options
Diffstat (limited to 'libstdc++-v3/include/parallel/workstealing.h')
-rw-r--r-- | libstdc++-v3/include/parallel/workstealing.h | 435 |
1 files changed, 219 insertions, 216 deletions
diff --git a/libstdc++-v3/include/parallel/workstealing.h b/libstdc++-v3/include/parallel/workstealing.h index 638057ca740..9e0db3a9584 100644 --- a/libstdc++-v3/include/parallel/workstealing.h +++ b/libstdc++-v3/include/parallel/workstealing.h @@ -49,261 +49,264 @@ namespace __gnu_parallel #define _GLIBCXX_JOB_VOLATILE volatile -/** @brief One __job for a certain thread. */ -template<typename _DifferenceTp> - struct _Job - { - typedef _DifferenceTp _DifferenceType; - - /** @brief First element. - * - * Changed by owning and stealing thread. By stealing thread, - * always incremented. */ - _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first; - - /** @brief Last element. - * - * Changed by owning thread only. */ - _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last; - - /** @brief Number of elements, i.e. @__c _M_last-_M_first+1. - * - * Changed by owning thread only. */ - _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load; - }; - -/** @brief Work stealing algorithm for random access iterators. - * - * Uses O(1) additional memory. Synchronization at job lists is - * done with atomic operations. - * @param __begin Begin iterator of element sequence. - * @param __end End iterator of element sequence. - * @param __op User-supplied functor (comparator, predicate, adding - * functor, ...). - * @param __f Functor to "process" an element with __op (depends on - * desired functionality, e. g. for std::for_each(), ...). - * @param __r Functor to "add" a single __result to the already - * processed elements (depends on functionality). - * @param __base Base value for reduction. - * @param __output Pointer to position where final result is written to - * @param __bound Maximum number of elements processed (e. g. for - * std::count_n()). - * @return User-supplied functor (that may contain a part of the result). - */ -template<typename _RAIter, - typename _Op, - typename _Fu, - typename _Red, - typename _Result> - _Op - __for_each_template_random_access_workstealing( - _RAIter __begin, _RAIter __end, _Op __op, _Fu& __f, _Red __r, - _Result __base, _Result& __output, - typename std::iterator_traits<_RAIter>::difference_type __bound) - { - _GLIBCXX_CALL(__end - __begin) - - typedef std::iterator_traits<_RAIter> _TraitsType; - typedef typename _TraitsType::difference_type _DifferenceType; - - const _Settings& __s = _Settings::get(); - - _DifferenceType __chunk_size = - static_cast<_DifferenceType>(__s.workstealing_chunk_size); - - // How many jobs? - _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound; - - // To avoid false sharing in a cache line. - const int __stride = - __s.cache_line_size * 10 / sizeof(_Job<_DifferenceType>) + 1; - - // Total number of threads currently working. - _ThreadIndex __busy = 0; - - _Job<_DifferenceType> *__job; - - omp_lock_t __output_lock; - omp_init_lock(&__output_lock); - - // Write base value to output. - __output = __base; - - // No more threads than jobs, at least one thread. - _ThreadIndex __num_threads = - __gnu_parallel::max<_ThreadIndex>(1, - __gnu_parallel::min<_DifferenceType>(__length, __get_max_threads())); - -# pragma omp parallel shared(__busy) num_threads(__num_threads) + /** @brief One __job for a certain thread. */ + template<typename _DifferenceTp> + struct _Job + { + typedef _DifferenceTp _DifferenceType; + + /** @brief First element. + * + * Changed by owning and stealing thread. By stealing thread, + * always incremented. */ + _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first; + + /** @brief Last element. + * + * Changed by owning thread only. */ + _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last; + + /** @brief Number of elements, i.e. @c _M_last-_M_first+1. + * + * Changed by owning thread only. */ + _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load; + }; + + /** @brief Work stealing algorithm for random access iterators. + * + * Uses O(1) additional memory. Synchronization at job lists is + * done with atomic operations. + * @param __begin Begin iterator of element sequence. + * @param __end End iterator of element sequence. + * @param __op User-supplied functor (comparator, predicate, adding + * functor, ...). + * @param __f Functor to "process" an element with __op (depends on + * desired functionality, e. g. for std::for_each(), ...). + * @param __r Functor to "add" a single __result to the already + * processed elements (depends on functionality). + * @param __base Base value for reduction. + * @param __output Pointer to position where final result is written to + * @param __bound Maximum number of elements processed (e. g. for + * std::count_n()). + * @return User-supplied functor (that may contain a part of the result). + */ + template<typename _RAIter, + typename _Op, + typename _Fu, + typename _Red, + typename _Result> + _Op + __for_each_template_random_access_workstealing(_RAIter __begin, + _RAIter __end, _Op __op, + _Fu& __f, _Red __r, + _Result __base, + _Result& __output, + typename std::iterator_traits<_RAIter>::difference_type __bound) + { + _GLIBCXX_CALL(__end - __begin) + + typedef std::iterator_traits<_RAIter> _TraitsType; + typedef typename _TraitsType::difference_type _DifferenceType; + + const _Settings& __s = _Settings::get(); + + _DifferenceType __chunk_size = + static_cast<_DifferenceType>(__s.workstealing_chunk_size); + + // How many jobs? + _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound; + + // To avoid false sharing in a cache line. + const int __stride = (__s.cache_line_size * 10 + / sizeof(_Job<_DifferenceType>) + 1); + + // Total number of threads currently working. + _ThreadIndex __busy = 0; + + _Job<_DifferenceType> *__job; + + omp_lock_t __output_lock; + omp_init_lock(&__output_lock); + + // Write base value to output. + __output = __base; + + // No more threads than jobs, at least one thread. + _ThreadIndex __num_threads = __gnu_parallel::max<_ThreadIndex> + (1, __gnu_parallel::min<_DifferenceType>(__length, + __get_max_threads())); + +# pragma omp parallel shared(__busy) num_threads(__num_threads) { - # pragma omp single - { - __num_threads = omp_get_num_threads(); + { + __num_threads = omp_get_num_threads(); - // Create job description array. - __job = new _Job<_DifferenceType>[__num_threads * __stride]; - } + // Create job description array. + __job = new _Job<_DifferenceType>[__num_threads * __stride]; + } - // Initialization phase. + // Initialization phase. - // Flags for every thread if it is doing productive work. - bool __iam_working = false; + // Flags for every thread if it is doing productive work. + bool __iam_working = false; - // Thread id. - _ThreadIndex __iam = omp_get_thread_num(); + // Thread id. + _ThreadIndex __iam = omp_get_thread_num(); - // This job. - _Job<_DifferenceType>& __my_job = __job[__iam * __stride]; + // This job. + _Job<_DifferenceType>& __my_job = __job[__iam * __stride]; - // Random number (for work stealing). - _ThreadIndex __victim; + // Random number (for work stealing). + _ThreadIndex __victim; - // Local value for reduction. - _Result __result = _Result(); + // Local value for reduction. + _Result __result = _Result(); - // Number of elements to steal in one attempt. - _DifferenceType __steal; + // Number of elements to steal in one attempt. + _DifferenceType __steal; - // Every thread has its own random number generator - // (modulo __num_threads). - _RandomNumber rand_gen(__iam, __num_threads); + // Every thread has its own random number generator + // (modulo __num_threads). + _RandomNumber __rand_gen(__iam, __num_threads); - // This thread is currently working. + // This thread is currently working. # pragma omp atomic - ++__busy; + ++__busy; - __iam_working = true; + __iam_working = true; - // How many jobs per thread? last thread gets the rest. - __my_job._M_first = - static_cast<_DifferenceType>(__iam * (__length / __num_threads)); + // How many jobs per thread? last thread gets the rest. + __my_job._M_first = static_cast<_DifferenceType> + (__iam * (__length / __num_threads)); - __my_job._M_last = (__iam == (__num_threads - 1)) ? - (__length - 1) : ((__iam + 1) * (__length / __num_threads) - 1); - __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; + __my_job._M_last = (__iam == (__num_threads - 1) + ? (__length - 1) + : ((__iam + 1) * (__length / __num_threads) - 1)); + __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; - // Init result with _M_first value (to have a base value for reduction) - if (__my_job._M_first <= __my_job._M_last) - { - // Cannot use volatile variable directly. - _DifferenceType __my_first = __my_job._M_first; - __result = __f(__op, __begin + __my_first); - ++__my_job._M_first; - --__my_job._M_load; - } + // Init result with _M_first value (to have a base value for reduction) + if (__my_job._M_first <= __my_job._M_last) + { + // Cannot use volatile variable directly. + _DifferenceType __my_first = __my_job._M_first; + __result = __f(__op, __begin + __my_first); + ++__my_job._M_first; + --__my_job._M_load; + } - _RAIter __current; + _RAIter __current; # pragma omp barrier - // Actual work phase - // Work on own or stolen current start - while (__busy > 0) - { - // Work until no productive thread left. + // Actual work phase + // Work on own or stolen current start + while (__busy > 0) + { + // Work until no productive thread left. # pragma omp flush(__busy) - // Thread has own work to do - while (__my_job._M_first <= __my_job._M_last) - { - // fetch-and-add call - // Reserve current job block (size __chunk_size) in my queue. - _DifferenceType __current_job = - __fetch_and_add<_DifferenceType>( - &(__my_job._M_first), __chunk_size); - - // Update _M_load, to make the three values consistent, - // _M_first might have been changed in the meantime - __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; - for (_DifferenceType __job_counter = 0; - __job_counter < __chunk_size - && __current_job <= __my_job._M_last; - ++__job_counter) - { - // Yes: process it! - __current = __begin + __current_job; - ++__current_job; - - // Do actual work. - __result = __r(__result, __f(__op, __current)); - } + // Thread has own work to do + while (__my_job._M_first <= __my_job._M_last) + { + // fetch-and-add call + // Reserve current job block (size __chunk_size) in my queue. + _DifferenceType __current_job = + __fetch_and_add<_DifferenceType>(&(__my_job._M_first), + __chunk_size); + + // Update _M_load, to make the three values consistent, + // _M_first might have been changed in the meantime + __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; + for (_DifferenceType __job_counter = 0; + __job_counter < __chunk_size + && __current_job <= __my_job._M_last; + ++__job_counter) + { + // Yes: process it! + __current = __begin + __current_job; + ++__current_job; + + // Do actual work. + __result = __r(__result, __f(__op, __current)); + } # pragma omp flush(__busy) - } + } - // After reaching this point, a thread's __job list is empty. - if (__iam_working) - { - // This thread no longer has work. + // After reaching this point, a thread's __job list is empty. + if (__iam_working) + { + // This thread no longer has work. # pragma omp atomic - --__busy; + --__busy; - __iam_working = false; - } + __iam_working = false; + } - _DifferenceType __supposed_first, __supposed_last, __supposed_load; - do - { - // Find random nonempty deque (not own), do consistency check. - __yield(); + _DifferenceType __supposed_first, __supposed_last, + __supposed_load; + do + { + // Find random nonempty deque (not own), do consistency check. + __yield(); # pragma omp flush(__busy) - __victim = rand_gen(); - __supposed_first = __job[__victim * __stride]._M_first; - __supposed_last = __job[__victim * __stride]._M_last; - __supposed_load = __job[__victim * __stride]._M_load; - } - while (__busy > 0 - && ((__supposed_load <= 0) - || ((__supposed_first + __supposed_load - 1) - != __supposed_last))); - - if (__busy == 0) - break; - - if (__supposed_load > 0) - { - // Has work and work to do. - // Number of elements to steal (at least one). - __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2; - - // Push __victim's current start forward. - _DifferenceType __stolen_first = - __fetch_and_add<_DifferenceType>( - &(__job[__victim * __stride]._M_first), __steal); - _DifferenceType __stolen_try = - __stolen_first + __steal - _DifferenceType(1); - - __my_job._M_first = __stolen_first; - __my_job._M_last = - __gnu_parallel::min(__stolen_try, __supposed_last); - __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; - - // Has potential work again. + __victim = __rand_gen(); + __supposed_first = __job[__victim * __stride]._M_first; + __supposed_last = __job[__victim * __stride]._M_last; + __supposed_load = __job[__victim * __stride]._M_load; + } + while (__busy > 0 + && ((__supposed_load <= 0) + || ((__supposed_first + __supposed_load - 1) + != __supposed_last))); + + if (__busy == 0) + break; + + if (__supposed_load > 0) + { + // Has work and work to do. + // Number of elements to steal (at least one). + __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2; + + // Push __victim's current start forward. + _DifferenceType __stolen_first = + __fetch_and_add<_DifferenceType> + (&(__job[__victim * __stride]._M_first), __steal); + _DifferenceType __stolen_try = (__stolen_first + __steal + - _DifferenceType(1)); + + __my_job._M_first = __stolen_first; + __my_job._M_last = __gnu_parallel::min(__stolen_try, + __supposed_last); + __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; + + // Has potential work again. # pragma omp atomic - ++__busy; - __iam_working = true; + ++__busy; + __iam_working = true; # pragma omp flush(__busy) - } + } # pragma omp flush(__busy) - } // end while __busy > 0 - // Add accumulated result to output. - omp_set_lock(&__output_lock); - __output = __r(__output, __result); - omp_unset_lock(&__output_lock); + } // end while __busy > 0 + // Add accumulated result to output. + omp_set_lock(&__output_lock); + __output = __r(__output, __result); + omp_unset_lock(&__output_lock); } - delete[] __job; + delete[] __job; - // Points to last element processed (needed as return value for - // some algorithms like transform) - __f._M_finish_iterator = __begin + __length; + // Points to last element processed (needed as return value for + // some algorithms like transform) + __f._M_finish_iterator = __begin + __length; - omp_destroy_lock(&__output_lock); + omp_destroy_lock(&__output_lock); - return __op; - } + return __op; + } } // end namespace #endif /* _GLIBCXX_PARALLEL_WORKSTEALING_H */ |