From d443dbee74bd4ece5d2435d9914c7eabb8dd567f Mon Sep 17 00:00:00 2001 From: MikeDvorskiy Date: Wed, 27 Nov 2024 13:03:07 +0100 Subject: [PATCH] [oneDPL][ranges][merge] support size limit for output; + draft for merge path for the host backend (__pattern_merge_2) --- include/oneapi/dpl/pstl/algorithm_impl.h | 136 +++++++++++++++++- .../oneapi/dpl/pstl/algorithm_ranges_impl.h | 92 +----------- .../oneapi/dpl/pstl/parallel_backend_tbb.h | 5 +- 3 files changed, 141 insertions(+), 92 deletions(-) diff --git a/include/oneapi/dpl/pstl/algorithm_impl.h b/include/oneapi/dpl/pstl/algorithm_impl.h index ae9094f721a..1d4e478704c 100644 --- a/include/oneapi/dpl/pstl/algorithm_impl.h +++ b/include/oneapi/dpl/pstl/algorithm_impl.h @@ -2948,6 +2948,40 @@ __pattern_remove_if(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, // merge //------------------------------------------------------------------------ +template +std::pair +__brick_merge(It1 __it_1, It1 __it_1_e, It2 __it_2, It2 __it_2_e, ItOut __it_out, ItOut __it_out_e, _Comp __comp) +{ + while(__it_1 != __it_1_e && __it_2 != __it_2_e) + { + if (__comp(*__it_1, *__it_2)) + { + *__it_out = *__it_1; + ++__it_out, ++__it_1; + } + else + { + *__it_out = *__it_2; + ++__it_out, ++__it_2; + } + if(__it_out == __it_out_e) + return {__it_1, __it_2}; + } + + if(__it_1 == __it_1_e) + { + for(; __it_2 != __it_2_e && __it_out != __it_out_e; ++__it_2, ++__it_out) + *__it_out = *__it_2; + } + else + { + //assert(__it_2 == __it_2_e); + for(; __it_1 != __it_1_e && __it_out != __it_out_e; ++__it_1, ++__it_out) + *__it_out = *__it_1; + } + return {__it_1, __it_2}; +} + template _OutputIterator __brick_merge(_ForwardIterator1 __first1, _ForwardIterator1 __last1, _ForwardIterator2 __first2, @@ -2980,13 +3014,106 @@ __pattern_merge(_Tag, _ExecutionPolicy&&, _ForwardIterator1 __first1, _ForwardIt typename _Tag::__is_vector{}); } +template::value_type, + class Compare> +ForwardIt lower_bound_2(ForwardIt first, ForwardIt last, const T& value, Compare comp) +{ + ForwardIt it; + typename std::iterator_traits::difference_type count, step; + count = std::distance(first, last); + + while (count > 0) + { + it = first; + step = count / 2; + std::advance(it, step); + + std::cout << "it: " << *it << " "; + if (comp(*it, value)) + { + first = ++it; + count -= step + 1; + } + else + count = step; + } + + std::cout << "first: " << *first << " "; + std::cout << std::endl; + return first; +} + +template +std::pair<_It1, _It2> +__pattern_merge_2(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2, + _Index2 __n_2, _OutIt __it_out, _Index3 __n_out, _Comp __comp) +{ + using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag; + + _It1 __it_res_1; + _It2 __it_res_2; + + __internal::__except_handler([&]() { + __par_backend::__parallel_for(__backend_tag{}, std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out, + [=, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j) + { + //a start merging point on the merge path; for each thread + _Index1 __r = 0; //row index + _Index2 __c = 0; //column index + + if(__i > 0) + { + //calc merge path intersection: + const _Index3 __d_size = std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1; + + auto __get_row = [__i, __n_1](auto __d) { return std::min<_Index1>(__i, __n_1) - __d - 1; }; + auto __get_column = [__i, __n_1](auto __d) { return std::max<_Index1>(0, __i - __n_1 - 1) + __d + (__i / (__n_1 + 1) > 0 ? 1 : 0); }; + + oneapi::dpl::counting_iterator<_Index3> __it_d(0); + + auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1, + [&](auto __d, auto __val) { + auto __r = __get_row(__d); + auto __c = __get_column(__d); + + oneapi::dpl::__internal::__compare<_Comp, std::identity> __cmp{__comp, std::identity{}}; + const auto __res = (__cmp(__it_1[__r], __it_2[__c]) ? 1 : 0); + + return __res < __val; + } + ); + + //intersection point + __r = __get_row(__res_d); + __c = __get_column(__res_d); + ++__r; //to get a merge matrix ceil, lying on the current diagonal + } + + //serial merge n elements, starting from input x and y, to [i, j) output range + auto __res = __brick_merge(__it_1 + __r, __it_1 + __n_1, + __it_2 + __c, __it_2 + __n_2, + __it_out + __i, __it_out + __j, __comp); + + if(__j == __n_out) + { + __it_res_1 = __res.first; + __it_res_2 = __res.second; + } + }, /*_ONEDPL_MERGE_CUT_OFF*/10); + }); + + return {__it_res_1, __it_res_2}; +} + template _RandomAccessIterator3 -__pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _RandomAccessIterator1 __first1, +__pattern_merge(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, _RandomAccessIterator1 __first1, _RandomAccessIterator1 __last1, _RandomAccessIterator2 __first2, _RandomAccessIterator2 __last2, _RandomAccessIterator3 __d_first, _Compare __comp) { +#if 0 using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag; return __internal::__except_handler([&]() { @@ -2999,6 +3126,13 @@ __pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _RandomAcc }); return __d_first + (__last1 - __first1) + (__last2 - __first2); }); +#else + auto __n_1 = __last1 - __first1; + auto __n_2 = __last2 - __first2; + auto __n_3 = __n_1 + __n_2; + __pattern_merge_2(__tag, std::forward<_ExecutionPolicy>(__exec), __first1, __n_1, __first2, __n_2, __d_first, __n_3, __comp); + return __d_first + __n_3; +#endif } //------------------------------------------------------------------------ diff --git a/include/oneapi/dpl/pstl/algorithm_ranges_impl.h b/include/oneapi/dpl/pstl/algorithm_ranges_impl.h index ff3f1e65e78..0d64598c1d5 100644 --- a/include/oneapi/dpl/pstl/algorithm_ranges_impl.h +++ b/include/oneapi/dpl/pstl/algorithm_ranges_impl.h @@ -465,48 +465,12 @@ __pattern_merge(_Tag __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _ return __return_type{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + std::ranges::size(__r2), __res}; } -template -std::pair -__brick_merge(It1 __it_1, It1 __it_1_e, It2 __it_2, It2 __it_2_e, ItOut __it_out, ItOut __it_out_e, _Comp __comp) -{ - while(__it_1 != __it_1_e && __it_2 != __it_2_e) - { - if (__comp(*__it_1, *__it_2)) - { - *__it_out = *__it_1; - ++__it_out, ++__it_1; - } - else - { - *__it_out = *__it_2; - ++__it_out, ++__it_2; - } - if(__it_out == __it_out_e) - return {__it_1, __it_2}; - } - - if(__it_1 == __it_1_e) - { - for(; __it_2 != __it_2_e && __it_out != __it_out_e; ++__it_2, ++__it_out) - *__it_out = *__it_2; - } - else - { - //assert(__it_2 == __it_2_e); - for(; __it_1 != __it_1_e && __it_out != __it_out_e; ++__it_1, ++__it_out) - *__it_out = *__it_1; - } - return {__it_1, __it_2}; -} - template auto -__pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, +__pattern_merge(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, _Proj1 __proj1, _Proj2 __proj2) { - using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag; - auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp, std::invoke(__proj1, std::forward(__val1)), std::invoke(__proj2, std::forward(__val2)));}; @@ -517,68 +481,18 @@ __pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _R1&& __r1 _Index1 __n_1 = std::ranges::size(__r1); _Index2 __n_2 = std::ranges::size(__r2); - _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r)); auto __it_1 = std::ranges::begin(__r1); auto __it_2 = std::ranges::begin(__r2); auto __it_out = std::ranges::begin(__out_r); - std::ranges::borrowed_iterator_t<_R1> __it_res_1; - std::ranges::borrowed_iterator_t<_R1> __it_res_2; - - __internal::__except_handler([&]() { - __par_backend::__parallel_for(__backend_tag{}, ::std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out, - [=, &__r1, &__r2, &__out_r, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j) - {/*...*/ - - //a start merging point on the merge path; for each thread - std::ranges::range_difference_t<_R1> __x = 0; - std::ranges::range_difference_t<_R2> __y = 0; - - if(__i > 0) - { - //calc merge path intersection: - const _Index3 __d_size = std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1; - - auto __get_x = [__i, __n_1](auto __d) { return std::min<_Index1>(__i, __n_1) - __d; }; - auto __get_y = [__i, __n_1](auto __d) { return std::max<_Index1>(0, __i - __n_1) + __d; }; - - oneapi::dpl::counting_iterator<_Index3> __it_d(0); - auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1, - [&](auto __d, auto __val) { - auto __x = __get_x(__d); - auto __y = __get_y(__d); - - const auto __res = __comp_2(__r1[__x], __r2[__y]) ? 0 : 1; - return __res < __val; - } - ); - //intersection point - __x = __get_x(__res_d); - __y = __get_y(__res_d); - - } - - const _Index3 __n = __j - __i; - - //serial merge n elements, starting from input x and y, to [i, j) output range - auto __res = __brick_merge(__it_1 + __x, __it_1 + __n_1, - __it_2 + __y, __it_2 + __n_2, - __it_out + __i, __it_out + __j, __comp_2); - - if(__j == __n_out) - { - __it_res_1 = __res.first; - __it_res_2 = __res.second; - } - }); - }); + auto __res = __pattern_merge_2(__tag, std::forward<_ExecutionPolicy>(__exec), __it_1, __n_1, __it_2, __n_2, __it_out, __n_out, __comp_2); using __return_type = std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, std::ranges::borrowed_iterator_t<_OutRange>>; - return __return_type{__it_res_1, __it_res_2, std::ranges::begin(__out_r) + __n_out}; + return __return_type{__res.first, __res.second, __it_out + __n_out}; } //TODO: diff --git a/include/oneapi/dpl/pstl/parallel_backend_tbb.h b/include/oneapi/dpl/pstl/parallel_backend_tbb.h index 32efecd95a2..77f150ef7b8 100644 --- a/include/oneapi/dpl/pstl/parallel_backend_tbb.h +++ b/include/oneapi/dpl/pstl/parallel_backend_tbb.h @@ -92,10 +92,11 @@ class __parallel_for_body // wrapper over tbb::parallel_for template void -__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f) +__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f, + std::size_t __grainsize = 1) { tbb::this_task_arena::isolate([=]() { - tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last), __parallel_for_body<_Index, _Fp>(__f)); + tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last, __grainsize), __parallel_for_body<_Index, _Fp>(__f)); }); }