From c1ff14ba53dd9da1785ab1417e0de05faaf1c7e2 Mon Sep 17 00:00:00 2001 From: MikeDvorskiy Date: Tue, 10 Dec 2024 17:55:37 +0100 Subject: [PATCH 1/2] [oneDPL][make] + usage ONEAPI_DEVICE_SELECTOR variable --- make/Makefile.common | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/make/Makefile.common b/make/Makefile.common index 610d42d68b4..5f47da61a31 100644 --- a/make/Makefile.common +++ b/make/Makefile.common @@ -49,13 +49,13 @@ endif # !os_name cfg ?= release -device_type ?= GPU +device_type ?= level_zero:gpu use_unnamed_lambda ?= 0 ranges_api ?= 0 ifeq ($(backend), sycl) ifeq ($(findstring FPGA, $(device_type)),) - export SYCL_DEVICE_FILTER=$(device_type) + export ONEAPI_DEVICE_SELECTOR=$(device_type) endif endif From bd19d4032888a7e7ff97c48454d45d3777396f67 Mon Sep 17 00:00:00 2001 From: MikeDvorskiy Date: Wed, 15 Jan 2025 14:29:56 +0100 Subject: [PATCH 2/2] [oneDPL][ranges][merge] support size limit for output --- include/oneapi/dpl/pstl/algorithm_impl.h | 123 +++++++++++++++++- .../oneapi/dpl/pstl/algorithm_ranges_impl.h | 36 ++--- .../dpl/pstl/glue_algorithm_ranges_impl.h | 7 +- .../hetero/algorithm_ranges_impl_hetero.h | 50 ++++--- .../dpcpp/parallel_backend_sycl_merge.h | 90 ++++++++----- .../dpcpp/parallel_backend_sycl_utils.h | 52 ++++++-- include/oneapi/dpl/pstl/omp/parallel_for.h | 11 +- include/oneapi/dpl/pstl/parallel_backend.h | 3 + .../oneapi/dpl/pstl/parallel_backend_serial.h | 2 +- .../oneapi/dpl/pstl/parallel_backend_tbb.h | 5 +- include/oneapi/dpl/pstl/unseq_backend_simd.h | 34 +++++ .../ranges/std_ranges_merge.pass.cpp | 58 +++++++-- test/parallel_api/ranges/std_ranges_test.h | 9 +- 13 files changed, 373 insertions(+), 107 deletions(-) diff --git a/include/oneapi/dpl/pstl/algorithm_impl.h b/include/oneapi/dpl/pstl/algorithm_impl.h index ae9094f721a..e6513e10399 100644 --- a/include/oneapi/dpl/pstl/algorithm_impl.h +++ b/include/oneapi/dpl/pstl/algorithm_impl.h @@ -31,6 +31,7 @@ #include "parallel_backend.h" #include "parallel_impl.h" #include "iterator_impl.h" +#include "../functional" #if _ONEDPL_HETERO_BACKEND # include "hetero/algorithm_impl_hetero.h" // for __pattern_fill_n, __pattern_generate_n @@ -2948,6 +2949,49 @@ __pattern_remove_if(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, // merge //------------------------------------------------------------------------ +template +std::pair +__brick_merge_2(It1 __it_1, It1 __it_1_e, It2 __it_2, It2 __it_2_e, ItOut __it_out, ItOut __it_out_e, _Comp __comp, + /* __is_vector = */ std::false_type) +{ + while(__it_1 != __it_1_e && __it_2 != __it_2_e) + { + if (__comp(*__it_1, *__it_2)) + { + *__it_out = *__it_1; + ++__it_out, ++__it_1; + } + else + { + *__it_out = *__it_2; + ++__it_out, ++__it_2; + } + if(__it_out == __it_out_e) + return {__it_1, __it_2}; + } + + if(__it_1 == __it_1_e) + { + for(; __it_2 != __it_2_e && __it_out != __it_out_e; ++__it_2, ++__it_out) + *__it_out = *__it_2; + } + else + { + //assert(__it_2 == __it_2_e); + for(; __it_1 != __it_1_e && __it_out != __it_out_e; ++__it_1, ++__it_out) + *__it_out = *__it_1; + } + return {__it_1, __it_2}; +} + +template +std::pair +__brick_merge_2(It1 __it_1, It1 __it_1_e, It2 __it_2, It2 __it_2_e, ItOut __it_out, ItOut __it_out_e, _Comp __comp, + /* __is_vector = */ std::true_type) +{ + return __unseq_backend::__simd_merge(__it_1, __it_1_e, __it_2, __it_2_e, __it_out, __it_out_e, __comp); +} + template _OutputIterator __brick_merge(_ForwardIterator1 __first1, _ForwardIterator1 __last1, _ForwardIterator2 __first2, @@ -2980,10 +3024,87 @@ __pattern_merge(_Tag, _ExecutionPolicy&&, _ForwardIterator1 __first1, _ForwardIt typename _Tag::__is_vector{}); } +template +std::pair<_It1, _It2> +__pattern_merge_2(_Tag, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2, + _Index2 __n_2, _OutIt __it_out, _Index3 __n_out, _Comp __comp) +{ + return __brick_merge_2(__it_1, __it_1 + __n_1, __it_2, __it_2 + __n_2, __it_out, __it_out + __n_out, __comp, + typename _Tag::__is_vector{}); +} + +template +std::pair<_It1, _It2> +__pattern_merge_2(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2, + _Index2 __n_2, _OutIt __it_out, _Index3 __n_out, _Comp __comp) +{ + using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag; + + _It1 __it_res_1; + _It2 __it_res_2; + + __internal::__except_handler([&]() { + __par_backend::__parallel_for(__backend_tag{}, std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out, + [=, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j) + { + //a start merging point on the merge path; for each thread + _Index1 __r = 0; //row index + _Index2 __c = 0; //column index + + if(__i > 0) + { + //calc merge path intersection: + const _Index3 __d_size = + std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1; + + auto __get_row = [__i, __n_1](auto __d) + { return std::min<_Index1>(__i, __n_1) - __d - 1; }; + auto __get_column = [__i, __n_1](auto __d) + { return std::max<_Index1>(0, __i - __n_1 - 1) + __d + (__i / (__n_1 + 1) > 0 ? 1 : 0); }; + + oneapi::dpl::counting_iterator<_Index3> __it_d(0); + + auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1, + [&](auto __d, auto __val) { + auto __r = __get_row(__d); + auto __c = __get_column(__d); + + oneapi::dpl::__internal::__compare<_Comp, oneapi::dpl::identity> + __cmp{__comp, oneapi::dpl::identity{}}; + const auto __res = (__cmp(__it_1[__r], __it_2[__c]) ? 1 : 0); + + return __res < __val; + } + ); + + //intersection point + __r = __get_row(__res_d); + __c = __get_column(__res_d); + ++__r; //to get a merge matrix ceil, lying on the current diagonal + } + + //serial merge n elements, starting from input x and y, to [i, j) output range + auto __res = __brick_merge_2(__it_1 + __r, __it_1 + __n_1, + __it_2 + __c, __it_2 + __n_2, + __it_out + __i, __it_out + __j, __comp, _IsVector{}); + + if(__j == __n_out) + { + __it_res_1 = __res.first; + __it_res_2 = __res.second; + } + }, _ONEDPL_MERGE_CUT_OFF); //grainsize + }); + + return {__it_res_1, __it_res_2}; +} + template _RandomAccessIterator3 -__pattern_merge(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _RandomAccessIterator1 __first1, +__pattern_merge(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, _RandomAccessIterator1 __first1, _RandomAccessIterator1 __last1, _RandomAccessIterator2 __first2, _RandomAccessIterator2 __last2, _RandomAccessIterator3 __d_first, _Compare __comp) { diff --git a/include/oneapi/dpl/pstl/algorithm_ranges_impl.h b/include/oneapi/dpl/pstl/algorithm_ranges_impl.h index 55d29a56be8..2c7d9873079 100644 --- a/include/oneapi/dpl/pstl/algorithm_ranges_impl.h +++ b/include/oneapi/dpl/pstl/algorithm_ranges_impl.h @@ -448,31 +448,31 @@ auto __pattern_merge(_Tag __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, _Proj1 __proj1, _Proj2 __proj2) { - static_assert(__is_parallel_tag_v<_Tag> || typename _Tag::__is_vector{}); - assert(std::ranges::size(__r1) + std::ranges::size(__r2) <= std::ranges::size(__out_r)); // for debug purposes only - + using __return_type = std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, + std::ranges::borrowed_iterator_t<_OutRange>>; + auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp, std::invoke(__proj1, std::forward(__val1)), std::invoke(__proj2, std::forward(__val2)));}; - auto __res = oneapi::dpl::__internal::__pattern_merge(__tag, std::forward<_ExecutionPolicy>(__exec), - std::ranges::begin(__r1), std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2), - std::ranges::begin(__r2) + std::ranges::size(__r2), std::ranges::begin(__out_r), __comp_2); + using _Index1 = std::ranges::range_difference_t<_R1>; + using _Index2 = std::ranges::range_difference_t<_R2>; + using _Index3 = std::ranges::range_difference_t<_OutRange>; - using __return_type = std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, - std::ranges::borrowed_iterator_t<_OutRange>>; + _Index1 __n_1 = std::ranges::size(__r1); + _Index2 __n_2 = std::ranges::size(__r2); + _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r)); - return __return_type{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + std::ranges::size(__r2), __res}; -} + auto __it_1 = std::ranges::begin(__r1); + auto __it_2 = std::ranges::begin(__r2); + auto __it_out = std::ranges::begin(__out_r); -template -auto -__pattern_merge(__serial_tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, - _Proj1 __proj1, _Proj2 __proj2) -{ - return std::ranges::merge(std::forward<_R1>(__r1), std::forward<_R2>(__r2), std::ranges::begin(__out_r), __comp, __proj1, - __proj2); + if(__n_out == 0) + return __return_type{__it_1, __it_2, __it_out}; + + auto __res = __pattern_merge_2(__tag, std::forward<_ExecutionPolicy>(__exec), __it_2, __n_2, __it_1, __n_1, __it_out, __n_out, __comp_2); + + return __return_type{__res.second, __res.first, __it_out + __n_out}; } } // namespace __ranges diff --git a/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h b/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h index 77a4875ccde..f2fc3840632 100644 --- a/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h +++ b/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h @@ -1173,9 +1173,12 @@ merge(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& _ { const auto __dispatch_tag = oneapi::dpl::__ranges::__select_backend(__exec, __rng1, __rng2, __rng3); - return oneapi::dpl::__internal::__ranges::__pattern_merge( + auto __view_res = views::all_write(::std::forward<_Range3>(__rng3)); + oneapi::dpl::__internal::__ranges::__pattern_merge( __dispatch_tag, ::std::forward<_ExecutionPolicy>(__exec), views::all_read(::std::forward<_Range1>(__rng1)), - views::all_read(::std::forward<_Range2>(__rng2)), views::all_write(::std::forward<_Range3>(__rng3)), __comp); + views::all_read(::std::forward<_Range2>(__rng2)), __view_res, __comp); + + return __view_res.size(); } template diff --git a/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h b/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h index da7820b91a2..a16bdb6cdde 100644 --- a/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h +++ b/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h @@ -51,10 +51,11 @@ namespace __ranges //------------------------------------------------------------------------ template -void +auto __pattern_walk_n(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Function __f, _Ranges&&... __rngs) { - auto __n = oneapi::dpl::__ranges::__get_first_range_size(__rngs...); + using _Size = std::make_unsigned_t...>>; + auto __n = std::min({_Size(__rngs.size())...}); if (__n > 0) { oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), @@ -62,6 +63,7 @@ __pattern_walk_n(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Function ::std::forward<_Ranges>(__rngs)...) .__deferrable_wait(); } + return __n; } #if _ONEDPL_CPP20_RANGES_PRESENT @@ -680,44 +682,44 @@ struct __copy2_wrapper; template -oneapi::dpl::__internal::__difference_t<_Range3> +std::pair, oneapi::dpl::__internal::__difference_t<_Range2>> __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) { auto __n1 = __rng1.size(); auto __n2 = __rng2.size(); - auto __n = __n1 + __n2; - if (__n == 0) - return 0; + if (__rng3.size() == 0) + return {0, 0}; //To consider the direct copying pattern call in case just one of sequences is empty. if (__n1 == 0) { - oneapi::dpl::__internal::__ranges::__pattern_walk_n( + auto __res = oneapi::dpl::__internal::__ranges::__pattern_walk_n( __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy1_wrapper>( ::std::forward<_ExecutionPolicy>(__exec)), oneapi::dpl::__internal::__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{}, ::std::forward<_Range2>(__rng2), ::std::forward<_Range3>(__rng3)); + return {0, __res}; } - else if (__n2 == 0) + + if (__n2 == 0) { - oneapi::dpl::__internal::__ranges::__pattern_walk_n( + auto __res = oneapi::dpl::__internal::__ranges::__pattern_walk_n( __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy2_wrapper>( ::std::forward<_ExecutionPolicy>(__exec)), oneapi::dpl::__internal::__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{}, ::std::forward<_Range1>(__rng1), ::std::forward<_Range3>(__rng3)); - } - else - { - __par_backend_hetero::__parallel_merge(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), - ::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2), - ::std::forward<_Range3>(__rng3), __comp) - .__deferrable_wait(); + return {__res, 0}; } - return __n; + auto __res = __par_backend_hetero::__parallel_merge(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), + ::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2), + ::std::forward<_Range3>(__rng3), __comp); + + auto __val = __res.get(); + return {__val.first, __val.second}; } #if _ONEDPL_CPP20_RANGES_PRESENT @@ -727,12 +729,18 @@ auto __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, _Proj1 __proj1, _Proj2 __proj2) { - assert(std::ranges::size(__r1) + std::ranges::size(__r2) <= std::ranges::size(__out_r)); // for debug purposes only - auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp, std::invoke(__proj1, std::forward(__val1)), std::invoke(__proj2, std::forward(__val2)));}; + using _Index1 = std::ranges::range_difference_t<_R1>; + using _Index2 = std::ranges::range_difference_t<_R2>; + using _Index3 = std::ranges::range_difference_t<_OutRange>; + + _Index1 __n_1 = std::ranges::size(__r1); + _Index2 __n_2 = std::ranges::size(__r2); + _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r)); + auto __res = oneapi::dpl::__internal::__ranges::__pattern_merge(__tag, std::forward<_ExecutionPolicy>(__exec), oneapi::dpl::__ranges::views::all_read(__r1), oneapi::dpl::__ranges::views::all_read(__r2), oneapi::dpl::__ranges::views::all_write(__out_r), __comp_2); @@ -740,8 +748,8 @@ __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _R1& using __return_t = std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, std::ranges::borrowed_iterator_t<_OutRange>>; - return __return_t{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + - std::ranges::size(__r2), std::ranges::begin(__out_r) + __res}; + return __return_t{std::ranges::begin(__r1) + __res.first, std::ranges::begin(__r2) + __res.second, + std::ranges::begin(__out_r) + __n_out}; } #endif //_ONEDPL_CPP20_RANGES_PRESENT diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h index b1a775dd8f4..4aec7da3458 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h @@ -133,9 +133,9 @@ __find_start_point(const _Rng1& __rng1, const _Index __rng1_from, _Index __rng1_ // Do serial merge of the data from rng1 (starting from start1) and rng2 (starting from start2) and writing // to rng3 (starting from start3) in 'chunk' steps, but do not exceed the total size of the sequences (n1 and n2) template -void +std::pair, oneapi::dpl::__internal::__difference_t<_Rng2>> __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _Index __start1, const _Index __start2, - const _Index __start3, const _Index __chunk, const _Index __n1, const _Index __n2, _Compare __comp) + const _Index __start3, const _Index __chunk, const _Index __n1, const _Index __n2, _Compare __comp, _Index __n3 = 0) { const _Index __rng1_size = std::min<_Index>(__n1 > __start1 ? __n1 - __start1 : _Index{0}, __chunk); const _Index __rng2_size = std::min<_Index>(__n2 > __start2 ? __n2 - __start2 : _Index{0}, __chunk); @@ -143,7 +143,9 @@ __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _I const _Index __rng1_idx_end = __start1 + __rng1_size; const _Index __rng2_idx_end = __start2 + __rng2_size; - const _Index __rng3_idx_end = __start3 + __rng3_size; + _Index __rng3_idx_end = __start3 + __rng3_size; + if(__n3 > 0) + __rng3_idx_end = std::min<_Index>(__n3, __rng3_idx_end); _Index __rng1_idx = __start1; _Index __rng2_idx = __start2; @@ -155,13 +157,14 @@ __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _I // One of __rng1_idx_less_n1 and __rng2_idx_less_n2 should be true here // because 1) we should fill output data with elements from one of the input ranges - // 2) we calculate __rng3_idx_end as std::min<_Index>(__rng1_size + __rng2_size, __chunk). + // 2) we calculate __rng3_idx_end as std::min<_Index>(__n3, __chunk). __rng3[__rng3_idx] = ((__rng1_idx_less_n1 && __rng2_idx_less_n2 && __comp(__rng2[__rng2_idx], __rng1[__rng1_idx])) || !__rng1_idx_less_n1) ? __rng2[__rng2_idx++] : __rng1[__rng1_idx++]; } + return {__rng1_idx, __rng2_idx}; } // Please see the comment for __parallel_for_submitter for optional kernel name explanation @@ -177,7 +180,7 @@ struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_N { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); - const _IdType __n = __n1 + __n2; + const _IdType __n = std::min<_IdType>(__n1 + __n2, __rng3.size()); assert(__n1 > 0 || __n2 > 0); @@ -188,20 +191,34 @@ struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_N const _IdType __steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __chunk); + using __val_t = _split_point_t<_IdType>; + using __result_and_scratch_storage_t = __result_and_scratch_storage<_ExecutionPolicy, __val_t>; + auto __p_res_storage = new __result_and_scratch_storage_t(__exec, 1, 0); + + // Save the raw pointer into a shared_ptr to return it in __future and extend the lifetime of the storage. + std::shared_ptr<__result_and_scratch_storage_base> __p_result_base(__p_res_storage); + auto __event = __exec.queue().submit( - [&__rng1, &__rng2, &__rng3, __comp, __chunk, __steps, __n1, __n2](sycl::handler& __cgh) { - oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); - __cgh.parallel_for<_Name...>(sycl::range(__steps), [=](sycl::item __item_id) { - const _IdType __i_elem = __item_id.get_linear_id() * __chunk; - const auto __start = - __find_start_point(__rng1, _IdType{0}, __n1, __rng2, _IdType{0}, __n2, __i_elem, __comp); - __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __chunk, __n1, __n2, - __comp); - }); + [&__rng1, &__rng2, &__rng3, __p_res_storage, __comp, __chunk, __steps, __n, __n1, __n2](sycl::handler& __cgh) { + oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); + auto __result_acc = __p_res_storage->template __get_result_acc(__cgh, __dpl_sycl::__no_init{}); + + __cgh.parallel_for<_Name...>(sycl::range(__steps), [=](sycl::item __item_id) { + auto __id = __item_id.get_linear_id(); + const _IdType __i_elem = __id * __chunk; + + const auto __n_merge = std::min<_IdType>(__chunk, __n - __i_elem); + const auto __start = __find_start_point(__rng1, _IdType{0}, __n1, __rng2, _IdType{0}, __n2, __i_elem, __comp); + auto __ends = __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __n_merge, __n1, __n2, __comp, __n); + + if(__id == __steps - 1) //the last WI does additional work + { + auto __res_ptr = __result_and_scratch_storage_t::__get_usm_or_buffer_accessor_ptr(__result_acc); + *__res_ptr = __ends; + } }); - // We should return the same thing in the second param of __future for compatibility - // with the returning value in __parallel_merge_submitter_large::operator() - return __future(__event, std::shared_ptr<__result_and_scratch_storage_base>{}); + }); + return __future(std::move(__event), std::move(__p_result_base)); } }; @@ -225,10 +242,8 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, // Calculate nd-range parameters template nd_range_params - eval_nd_range_params(_ExecutionPolicy&& __exec, const _Range1& __rng1, const _Range2& __rng2) const + eval_nd_range_params(_ExecutionPolicy&& __exec, const _Range1& __rng1, const _Range2& __rng2, const std::size_t __n) const { - const std::size_t __n = __rng1.size() + __rng2.size(); - // Empirical number of values to process per work-item const std::uint8_t __chunk = __exec.queue().get_device().is_cpu() ? 128 : 4; @@ -244,13 +259,12 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, // Calculation of split points on each base diagonal template sycl::event - eval_split_points_for_groups(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Compare __comp, + eval_split_points_for_groups(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _IdType __n, _Compare __comp, const nd_range_params& __nd_range_params, _Storage& __base_diagonals_sp_global_storage) const { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); - const _IdType __n = __n1 + __n2; const _IdType __base_diag_chunk = __nd_range_params.steps_between_two_base_diags * __nd_range_params.chunk; @@ -288,13 +302,16 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); + const _IdType __n = std::min<_IdType>(__n1 + __n2, __rng3.size()); - return __exec.queue().submit([&__event, &__rng1, &__rng2, &__rng3, __comp, __nd_range_params, + return __exec.queue().submit([&__event, &__rng1, &__rng2, &__rng3, __n, __comp, __nd_range_params, __base_diagonals_sp_global_storage, __n1, __n2](sycl::handler& __cgh) { oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); auto __base_diagonals_sp_global_acc = __base_diagonals_sp_global_storage.template __get_scratch_acc(__cgh); + auto __result_acc = __base_diagonals_sp_global_storage.template __get_result_acc(__cgh, __dpl_sycl::__no_init{}); + __cgh.depends_on(__event); __cgh.parallel_for<_MergeKernelName...>( @@ -320,8 +337,13 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, __start = __base_diagonals_sp_global_ptr[__diagonal_idx]; } - __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, - __nd_range_params.chunk, __n1, __n2, __comp); + auto __ends = __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, + __nd_range_params.chunk, __n1, __n2, __comp, __n); + if(__global_idx == __nd_range_params.steps - 1) + { + auto __res_ptr = _Storage::__get_usm_or_buffer_accessor_ptr(__result_acc); + *__res_ptr = __ends; + } }); }); } @@ -331,24 +353,28 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, auto operator()(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) const { - assert(__rng1.size() > 0 || __rng2.size() > 0); + const _IdType __n1 = __rng1.size(); + const _IdType __n2 = __rng2.size(); + assert(__n1 > 0 || __n2 > 0); + + const _IdType __n = std::min<_IdType>(__n1 + __n2, __rng3.size()); _PRINT_INFO_IN_DEBUG_MODE(__exec); // Calculate nd-range parameters - const nd_range_params __nd_range_params = eval_nd_range_params(__exec, __rng1, __rng2); + const nd_range_params __nd_range_params = eval_nd_range_params(__exec, __rng1, __rng2, __n); // Create storage to save split-points on each base diagonal + 1 (for the right base diagonal in the last work-group) - auto __p_base_diagonals_sp_global_storage = - new __result_and_scratch_storage<_ExecutionPolicy, _split_point_t<_IdType>>( - __exec, 0, __nd_range_params.base_diag_count + 1); + using __val_t = _split_point_t<_IdType>; + auto __p_base_diagonals_sp_global_storage = new __result_and_scratch_storage<_ExecutionPolicy, __val_t>(__exec, + 1, __nd_range_params.base_diag_count + 1); // Save the raw pointer into a shared_ptr to return it in __future and extend the lifetime of the storage. std::shared_ptr<__result_and_scratch_storage_base> __p_result_and_scratch_storage_base( static_cast<__result_and_scratch_storage_base*>(__p_base_diagonals_sp_global_storage)); // Find split-points on the base diagonals - sycl::event __event = eval_split_points_for_groups(__exec, __rng1, __rng2, __comp, __nd_range_params, + sycl::event __event = eval_split_points_for_groups(__exec, __rng1, __rng2, __n, __comp, __nd_range_params, *__p_base_diagonals_sp_global_storage); // Merge data using split points on each diagonal @@ -391,7 +417,7 @@ __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy using __value_type = oneapi::dpl::__internal::__value_t<_Range3>; - const std::size_t __n = __rng1.size() + __rng2.size(); + const std::uint64_t __n = std::min(__rng1.size() + __rng2.size(), __rng3.size()); if (__n < __get_starting_size_limit_for_large_submitter<__value_type>()) { using _WiIndex = std::uint32_t; diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h index 3d9d923d80d..5d82b5b4ac5 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h @@ -522,6 +522,7 @@ struct __usm_or_buffer_accessor struct __result_and_scratch_storage_base { virtual ~__result_and_scratch_storage_base() = default; + virtual std::size_t __get_data(sycl::event, std::size_t* __p_buf) const = 0; }; template @@ -654,6 +655,16 @@ struct __result_and_scratch_storage : __result_and_scratch_storage_base #endif } + _T + __wait_and_get_value(sycl::event __event) const + { + if (is_USM()) + __event.wait_and_throw(); + + return __get_value(); + } + +private: bool is_USM() const { @@ -663,17 +674,17 @@ struct __result_and_scratch_storage : __result_and_scratch_storage_base // Note: this member function assumes the result is *ready*, since the __future has already // waited on the relevant event. _T - __get_value(size_t idx = 0) const + __get_value() const { - assert(idx < __result_n); + assert( __result_n == 1); if (__use_USM_host && __supports_USM_device) { - return *(__result_buf.get() + idx); + return *(__result_buf.get()); } else if (__supports_USM_device) { _T __tmp; - __exec.queue().memcpy(&__tmp, __scratch_buf.get() + __scratch_n + idx, 1 * sizeof(_T)).wait(); + __exec.queue().memcpy(&__tmp, __scratch_buf.get() + __scratch_n, 1 * sizeof(_T)).wait(); return __tmp; } else @@ -682,14 +693,29 @@ struct __result_and_scratch_storage : __result_and_scratch_storage_base } } - template - _T - __wait_and_get_value(_Event&& __event, size_t idx = 0) const + template + std::size_t + __fill_data(std::pair<_Type, _Type>&& __p, std::size_t* __p_buf) const + { + __p_buf[0] = __p.first; + __p_buf[1] = __p.second; + return 2; + } + + template + std::size_t + __fill_data(_Args&&...) const + { + assert(!"Unsupported return type"); + return 0; + } + + virtual std::size_t __get_data(sycl::event __event, std::size_t* __p_buf) const override { if (is_USM()) __event.wait_and_throw(); - return __get_value(idx); + return __fill_data(__get_value(), __p_buf); } }; @@ -729,6 +755,16 @@ class __future : private std::tuple<_Args...> return __storage.__wait_and_get_value(__my_event); } + constexpr auto + __wait_and_get_value(const std::shared_ptr<__result_and_scratch_storage_base>& __p_storage) + { + std::size_t __buf[2] = {0, 0}; + auto __n = __p_storage->__get_data(__my_event, __buf); + assert(__n == 2); + + return std::pair{__buf[0], __buf[1]}; + } + template constexpr auto __wait_and_get_value(const _T& __val) diff --git a/include/oneapi/dpl/pstl/omp/parallel_for.h b/include/oneapi/dpl/pstl/omp/parallel_for.h index 1a0ea24d798..917b3089059 100644 --- a/include/oneapi/dpl/pstl/omp/parallel_for.h +++ b/include/oneapi/dpl/pstl/omp/parallel_for.h @@ -29,10 +29,10 @@ namespace __omp_backend template void -__parallel_for_body(_Index __first, _Index __last, _Fp __f) +__parallel_for_body(_Index __first, _Index __last, _Fp __f, std::size_t __grainsize) { // initial partition of the iteration space into chunks - auto __policy = oneapi::dpl::__omp_backend::__chunk_partitioner(__first, __last); + auto __policy = oneapi::dpl::__omp_backend::__chunk_partitioner(__first, __last, __grainsize); // To avoid over-subscription we use taskloop for the nested parallelism _PSTL_PRAGMA(omp taskloop untied mergeable) @@ -49,20 +49,21 @@ __parallel_for_body(_Index __first, _Index __last, _Fp __f) template void -__parallel_for(oneapi::dpl::__internal::__omp_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f) +__parallel_for(oneapi::dpl::__internal::__omp_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f, + std::size_t __grainsize = __default_chunk_size) { if (omp_in_parallel()) { // we don't create a nested parallel region in an existing parallel // region: just create tasks - oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f); + oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f, __grainsize); } else { // in any case (nested or non-nested) one parallel region is created and // only one thread creates a set of tasks _PSTL_PRAGMA(omp parallel) - _PSTL_PRAGMA(omp single nowait) { oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f); } + _PSTL_PRAGMA(omp single nowait) { oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f, __grainsize); } } } diff --git a/include/oneapi/dpl/pstl/parallel_backend.h b/include/oneapi/dpl/pstl/parallel_backend.h index b243e8fb492..841a9357eb7 100644 --- a/include/oneapi/dpl/pstl/parallel_backend.h +++ b/include/oneapi/dpl/pstl/parallel_backend.h @@ -35,6 +35,9 @@ # endif #endif +//the parallel backend constants +#define _ONEDPL_MERGE_CUT_OFF 2000 + namespace oneapi { namespace dpl diff --git a/include/oneapi/dpl/pstl/parallel_backend_serial.h b/include/oneapi/dpl/pstl/parallel_backend_serial.h index 6acd4b617f9..032306dbe69 100644 --- a/include/oneapi/dpl/pstl/parallel_backend_serial.h +++ b/include/oneapi/dpl/pstl/parallel_backend_serial.h @@ -45,7 +45,7 @@ __cancel_execution(oneapi::dpl::__internal::__serial_backend_tag) template void __parallel_for(oneapi::dpl::__internal::__serial_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, - _Fp __f) + _Fp __f, std::size_t __grainsize = 1) { __f(__first, __last); } diff --git a/include/oneapi/dpl/pstl/parallel_backend_tbb.h b/include/oneapi/dpl/pstl/parallel_backend_tbb.h index 59821e98156..a977fc3d1a9 100644 --- a/include/oneapi/dpl/pstl/parallel_backend_tbb.h +++ b/include/oneapi/dpl/pstl/parallel_backend_tbb.h @@ -92,10 +92,11 @@ class __parallel_for_body // wrapper over tbb::parallel_for template void -__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f) +__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f, + std::size_t __grainsize = 1) { tbb::this_task_arena::isolate([=]() { - tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last), __parallel_for_body<_Index, _Fp>(__f)); + tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last, __grainsize), __parallel_for_body<_Index, _Fp>(__f)); }); } diff --git a/include/oneapi/dpl/pstl/unseq_backend_simd.h b/include/oneapi/dpl/pstl/unseq_backend_simd.h index 7e454c80268..e3379456837 100644 --- a/include/oneapi/dpl/pstl/unseq_backend_simd.h +++ b/include/oneapi/dpl/pstl/unseq_backend_simd.h @@ -879,6 +879,40 @@ __simd_remove_if(_RandomAccessIterator __first, _DifferenceType __n, _UnaryPredi } return __current + __cnt; } + +template +std::pair<_Iterator1, _Iterator2> +__simd_merge(_Iterator1 __x, _Iterator1 __x_e, _Iterator2 __y, _Iterator2 __y_e, _Iterator3 __i, _Iterator3 __j, _Comp __comp) +{ + _ONEDPL_PRAGMA_SIMD + for(_Iterator3 __k = __i; __k < __j; ++__k) + { + if(__x >= __x_e) + { + assert(__y < __y_e); + *__k = *__y; + ++__y; + } + else if(__y >= __y_e) + { + assert(__x < __x_e); + *__k = *__x; + ++__x; + } + else if(std::invoke(__comp, *__x, *__y)) + { + *__k = *__x; + ++__x; + } + else + { + *__k = *__y; + ++__y; + } + } + return {__x, __y}; +} + } // namespace __unseq_backend } // namespace dpl } // namespace oneapi diff --git a/test/parallel_api/ranges/std_ranges_merge.pass.cpp b/test/parallel_api/ranges/std_ranges_merge.pass.cpp index 7752c82f275..2883d7db90f 100644 --- a/test/parallel_api/ranges/std_ranges_merge.pass.cpp +++ b/test/parallel_api/ranges/std_ranges_merge.pass.cpp @@ -25,25 +25,59 @@ main() //A checker below modifies a return type; a range based version with policy has another return type. auto merge_checker = [](std::ranges::random_access_range auto&& r_1, std::ranges::random_access_range auto&& r_2, - std::ranges::random_access_range auto&& r_out, auto&&... args) + std::ranges::random_access_range auto&& r_out, auto comp, auto proj1, + auto proj2) { - auto res = std::ranges::merge(std::forward(r_1), std::forward(r_2), - std::ranges::begin(r_out), std::forward(args)...); - using ret_type = std::ranges::merge_result, std::ranges::borrowed_iterator_t, std::ranges::borrowed_iterator_t>; - return ret_type{res.in1, res.in2, res.out}; + + auto it_out = std::ranges::begin(r_out); + auto it_1 = std::ranges::begin(r_1); + auto it_2 = std::ranges::begin(r_2); + auto it_1_e = std::ranges::end(r_1); + auto it_2_e = std::ranges::end(r_2); + auto it_out_e = std::ranges::end(r_out); + + + while(it_1 != it_1_e && it_2 != it_2_e) + { + if (std::invoke(comp, std::invoke(proj2, *it_2), std::invoke(proj1, *it_1))) + { + *it_out = *it_2; + ++it_out, ++it_2; + } + else + { + *it_out = *it_1; + ++it_out, ++it_1; + } + if(it_out == it_out_e) + return ret_type{it_1, it_2, it_out}; + } + + if(it_1 == it_1_e) + { + for(; it_2 != it_2_e && it_out != it_out_e; ++it_2, ++it_out) + *it_out = *it_2; + } + else + { + for(; it_1 != it_1_e && it_out != it_out_e; ++it_1, ++it_out) + *it_out = *it_1; + } + + return ret_type{it_1, it_2, it_out}; }; - test_range_algo<0, int, data_in_in_out>{big_sz}(dpl_ranges::merge, merge_checker, std::ranges::less{}); + test_range_algo<0, int, data_in_in_out_lim>{big_sz}(dpl_ranges::merge, merge_checker, std::ranges::less{}, std::identity{}, std::identity{}); - test_range_algo<1, int, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, proj, proj); - test_range_algo<2, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::x, &P2::x); - test_range_algo<3, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::proj, &P2::proj); + test_range_algo<1, int, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, proj, proj); + test_range_algo<2, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::x, &P2::x); + test_range_algo<3, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::proj, &P2::proj); - test_range_algo<4, int, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, proj, proj); - test_range_algo<5, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::x, &P2::x); - test_range_algo<6, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::proj, &P2::proj); + test_range_algo<4, int, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, proj, proj); + test_range_algo<5, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::x, &P2::x); + test_range_algo<6, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::proj, &P2::proj); #endif //_ENABLE_STD_RANGES_TESTING return TestUtils::done(_ENABLE_STD_RANGES_TESTING); diff --git a/test/parallel_api/ranges/std_ranges_test.h b/test/parallel_api/ranges/std_ranges_test.h index 6057ccebfcd..51e0665a27f 100644 --- a/test/parallel_api/ranges/std_ranges_test.h +++ b/test/parallel_api/ranges/std_ranges_test.h @@ -277,14 +277,12 @@ struct test Container cont_in1(exec, n_in1, [](auto i) { return i;}); Container cont_in2(exec, n_in2, [](auto i) { return i/3;}); - const int max_n_out = max_n*2; - Container cont_out(exec, max_n_out, [](auto i) { return 0;}); - Container cont_exp(exec, max_n_out, [](auto i) { return 0;}); + Container cont_out(exec, n_out, [](auto i) { return 0;}); + Container cont_exp(exec, n_out, [](auto i) { return 0;}); assert(n_in1 <= max_n); assert(n_in2 <= max_n); - assert(n_out <= max_n_out); - + auto src_view1 = tr_in(std::views::all(cont_in1())); auto src_view2 = tr_in(std::views::all(cont_in2())); auto expected_view = tr_out(std::views::all(cont_exp())); @@ -322,6 +320,7 @@ struct test { const int r_size = max_n; process_data_in_in_out(r_size, r_size, r_size, exec, algo, checker, args...); + process_data_in_in_out(r_size, r_size, r_size*2, exec, algo, checker, args...); process_data_in_in_out(r_size/2, r_size, r_size, exec, algo, checker, args...); process_data_in_in_out(r_size, r_size/2, r_size, exec, algo, checker, args...); process_data_in_in_out(r_size, r_size, r_size/2, std::forward(exec), algo, checker, args...);