Skip to content

Commit

Permalink
[oneDPL] Remove local (in-group) atomic usage from __parallel_find_or (
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyKopienko authored Jul 9, 2024
1 parent 568c483 commit a1798c3
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 50 deletions.
130 changes: 80 additions & 50 deletions include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl.h
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,8 @@ struct __parallel_find_forward_tag
using _AtomicType = oneapi::dpl::__internal::__difference_t<_RangeType>;
#endif

using _LocalResultsReduceOp = __dpl_sycl::__minimum<_AtomicType>;

// The template parameter is intended to unify __init_value in tags.
template <typename _DiffType>
constexpr static _AtomicType
Expand All @@ -973,12 +975,21 @@ struct __parallel_find_forward_tag
return __val;
}

template <typename _TAtomic>
// As far as we make search from begin to the end of data, we should save the first (minimal) found state
// in the __save_state_to (local state) / __save_state_to_atomic (global state) methods.

template <sycl::access::address_space _Space>
static void
__save_state_to_atomic(_TAtomic& __found, _AtomicType __new_state)
__save_state_to_atomic(__dpl_sycl::__atomic_ref<_AtomicType, _Space>& __atomic, _AtomicType __new_state)
{
// As far as we make search from begin to the end of data, we should save the first (minimal) found state.
__found.fetch_min(__new_state);
__atomic.fetch_min(__new_state);
}

template <typename _TFoundState>
static void
__save_state_to(_TFoundState& __found, _AtomicType __new_state)
{
__found = std::min(__found, __new_state);
}
};

Expand All @@ -993,18 +1004,29 @@ struct __parallel_find_backward_tag
using _AtomicType = oneapi::dpl::__internal::__difference_t<_RangeType>;
#endif

using _LocalResultsReduceOp = __dpl_sycl::__maximum<_AtomicType>;

template <typename _DiffType>
constexpr static _AtomicType __init_value(_DiffType)
{
return _AtomicType{-1};
}

template <typename _TAtomic>
// As far as we make search from end to the begin of data, we should save the last (maximal) found state
// in the __save_state_to (local state) / __save_state_to_atomic (global state) methods.

template <sycl::access::address_space _Space>
static void
__save_state_to_atomic(_TAtomic& __found, _AtomicType __new_state)
__save_state_to_atomic(__dpl_sycl::__atomic_ref<_AtomicType, _Space>& __atomic, _AtomicType __new_state)
{
// As far as we make search from end to the begin of data, we should save the last (maximal) found state.
__found.fetch_max(__new_state);
__atomic.fetch_max(__new_state);
}

template <typename _TFoundState>
static void
__save_state_to(_TFoundState& __found, _AtomicType __new_state)
{
__found = std::max(__found, __new_state);
}
};

Expand All @@ -1013,19 +1035,31 @@ struct __parallel_or_tag
{
using _AtomicType = int32_t;

using _LocalResultsReduceOp = __dpl_sycl::__bit_or<_AtomicType>;

// The template parameter is intended to unify __init_value in tags.
template <typename _DiffType>
constexpr static _AtomicType __init_value(_DiffType)
{
return 0;
}

template <typename _TAtomic>
// Store that a match was found. Its position is not relevant for or semantics
// in the __save_state_to (local state) / __save_state_to_atomic (global state) methods.
static constexpr _AtomicType __found_state = 1;

template <sycl::access::address_space _Space>
static void
__save_state_to_atomic(_TAtomic& __found, _AtomicType /*__new_state*/)
__save_state_to_atomic(__dpl_sycl::__atomic_ref<_AtomicType, _Space>& __atomic, _AtomicType /*__new_state*/)
{
// Store that a match was found. Its position is not relevant for or semantics.
__found.store(1);
__atomic.store(__found_state);
}

template <typename _TFoundState>
static void
__save_state_to(_TFoundState& __found, _AtomicType /*__new_state*/)
{
__found = __found_state;
}
};

Expand All @@ -1052,11 +1086,11 @@ struct __early_exit_find_or
{
_Pred __pred;

template <typename _NDItemId, typename _IterSize, typename _WgSize, typename _LocalAtomic, typename _BrickTag,
template <typename _NDItemId, typename _IterSize, typename _WgSize, typename _LocalFoundState, typename _BrickTag,
typename... _Ranges>
void
operator()(const _NDItemId __item_id, const _IterSize __n_iter, const _WgSize __wg_size,
_LocalAtomic& __found_local, _BrickTag __brick_tag, _Ranges&&... __rngs) const
_LocalFoundState& __found_local, _BrickTag __brick_tag, _Ranges&&... __rngs) const
{
// There are 3 possible tag types here:
// - __parallel_find_forward_tag : in case when we find the first value in the data;
Expand All @@ -1081,27 +1115,22 @@ struct __early_exit_find_or
bool __something_was_found = false;
for (_IterSize __i = 0; !__something_was_found && __i < __n_iter; ++__i)
{
//in case of find-semantic __shifted_idx must be the same type as the atomic for a correct comparison
using _ShiftedIdxType = ::std::conditional_t<_OrTagType::value, decltype(__init_index + __i * __shift),
decltype(__found_local.load())>;

_IterSize __current_iter = __i;
if constexpr (__is_backward_tag(__brick_tag))
__current_iter = __n_iter - 1 - __i;

_ShiftedIdxType __shifted_idx = __init_index + __current_iter * __shift;
// TODO:[Performance] the issue with atomic load (in comparison with __shifted_idx for early exit)
// should be investigated later, with other HW
const auto __shifted_idx = __init_index + __current_iter * __shift;

if (__shifted_idx < __n && __pred(__shifted_idx, __rngs...))
{
// Update local (for group) atomic state with the found index
_BrickTag::__save_state_to_atomic(__found_local, __shifted_idx);
// Update local found state
_BrickTag::__save_state_to(__found_local, __shifted_idx);

// This break is mandatory from the performance point of view.
// This break is safe for all our cases:
// 1) __parallel_find_forward_tag : when we search for the first matching data entry, we process data from start to end (forward direction).
// This means that after first found entry there is no reason to process data anymore.
// 2) __parallel_find_backward_tag : when we search for the last matching data entry, we process data from end to start (backward direction).
// 2) __parallel_find_backward_tag : when we search for the last matching data entry, we process data from end to start (backward direction).
// This means that after the first found entry there is no reason to process data anymore too.
// 3) __parallel_or_tag : when we search for any matching data entry, we process data from start to end (forward direction).
// This means that after the first found entry there is no reason to process data anymore too.
Expand Down Expand Up @@ -1139,15 +1168,20 @@ __parallel_find_or(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPoli
assert(__rng_n > 0);

// TODO: find a way to generalize getting of reliable work-group size
auto __wgroup_size = oneapi::dpl::__internal::__max_work_group_size(__exec);
std::size_t __wgroup_size = oneapi::dpl::__internal::__max_work_group_size(__exec);
#if _ONEDPL_COMPILE_KERNEL
auto __kernel = __internal::__kernel_compiler<_FindOrKernel>::__compile(__exec);
__wgroup_size = ::std::min(__wgroup_size, oneapi::dpl::__internal::__kernel_work_group_size(__exec, __kernel));
#endif

#if _ONEDPL_FPGA_EMU
// Limit the maximum work-group size to minimize the cost of work-group reduction.
// Limiting this also helps to avoid huge work-group sizes on some devices (e.g., FPGU emulation).
__wgroup_size = std::min(__wgroup_size, (std::size_t)2048);
#endif
auto __max_cu = oneapi::dpl::__internal::__max_compute_units(__exec);

auto __n_groups = oneapi::dpl::__internal::__dpl_ceiling_div(__rng_n, __wgroup_size);
// TODO: try to change __n_groups with another formula for more perfect load balancing
__n_groups = ::std::min(__n_groups, decltype(__n_groups)(__max_cu));

auto __n_iter = oneapi::dpl::__internal::__dpl_ceiling_div(__rng_n, __n_groups * __wgroup_size);
Expand All @@ -1161,15 +1195,13 @@ __parallel_find_or(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPoli

// scope is to copy data back to __result after destruction of temporary sycl:buffer
{
auto __temp = sycl::buffer<_AtomicType, 1>(&__result, 1); // temporary storage for global atomic
sycl::buffer<_AtomicType, 1> __result_sycl_buf(&__result, 1); // temporary storage for global atomic

// main parallel_for
__exec.queue().submit([&](sycl::handler& __cgh) {
oneapi::dpl::__ranges::__require_access(__cgh, __rngs...);
auto __temp_acc = __temp.template get_access<access_mode::read_write>(__cgh);
auto __result_sycl_buf_acc = __result_sycl_buf.template get_access<access_mode::read_write>(__cgh);

// create local accessor to connect atomic with
__dpl_sycl::__local_accessor<_AtomicType> __temp_local(1, __cgh);
#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT
__cgh.use_kernel_bundle(__kernel.get_kernel_bundle());
#endif
Expand All @@ -1182,37 +1214,35 @@ __parallel_find_or(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPoli
[=](sycl::nd_item</*dim=*/1> __item_id) {
auto __local_idx = __item_id.get_local_id(0);

__dpl_sycl::__atomic_ref<_AtomicType, sycl::access::address_space::global_space> __found(
*__dpl_sycl::__get_accessor_ptr(__temp_acc));
__dpl_sycl::__atomic_ref<_AtomicType, sycl::access::address_space::local_space> __found_local(
*__dpl_sycl::__get_accessor_ptr(__temp_local));

// 1. Set initial value to local atomic
if (__local_idx == 0)
__found_local.store(__init_value);
__dpl_sycl::__group_barrier(__item_id);
// 1. Set initial value to local found state
_AtomicType __found_local = __init_value;

// 2. Find any element that satisfies pred and set local atomic value to global atomic
// 2. Find any element that satisfies pred
__pred(__item_id, __n_iter, __wgroup_size, __found_local, __brick_tag, __rngs...);
__dpl_sycl::__group_barrier(__item_id);

// Set local atomic value to global atomic
if (__local_idx == 0)
// 3. Reduce over group: find __dpl_sycl::__minimum (for the __parallel_find_forward_tag),
// find __dpl_sycl::__maximum (for the __parallel_find_backward_tag)
// or update state with __dpl_sycl::__bit_or (for the __parallel_or_tag)
// inside all our group items
__found_local = __dpl_sycl::__reduce_over_group(__item_id.get_group(), __found_local,
typename _BrickTag::_LocalResultsReduceOp{});

// Set local found state value value to global atomic
if (__local_idx == 0 && __found_local != __init_value)
{
const auto __found_local_state = __found_local.load();
if (__found_local_state != __init_value)
{
// Update global (for all groups) atomic state with the found index
_BrickTag::__save_state_to_atomic(__found, __found_local_state);
}
__dpl_sycl::__atomic_ref<_AtomicType, sycl::access::address_space::global_space> __found(
*__dpl_sycl::__get_accessor_ptr(__result_sycl_buf_acc));

// Update global (for all groups) atomic state with the found index
_BrickTag::__save_state_to_atomic(__found, __found_local);
}
});
});
//The end of the scope - a point of synchronization (on temporary sycl buffer destruction)
}

if constexpr (__or_tag_check)
return __result;
return __result != __init_value;
else
return __result != __init_value ? __result : __rng_n;
}
Expand Down
5 changes: 5 additions & 0 deletions include/oneapi/dpl/pstl/hetero/dpcpp/sycl_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ using __maximum = sycl::maximum<_T>;
template <typename _T = void>
using __minimum = sycl::minimum<_T>;

template <typename _T = void>
using __bit_or = sycl::bit_or<_T>;
#else // _ONEDPL_SYCL2020_FUNCTIONAL_OBJECTS_PRESENT
template <typename _T>
using __plus = sycl::ONEAPI::plus<_T>;
Expand All @@ -139,6 +141,9 @@ using __maximum = sycl::ONEAPI::maximum<_T>;

template <typename _T>
using __minimum = sycl::ONEAPI::minimum<_T>;

template <typename _T = void>
using __bit_or = sycl::ONEAPI::bit_or<_T>;
#endif // _ONEDPL_SYCL2020_FUNCTIONAL_OBJECTS_PRESENT

template <typename _Buffer>
Expand Down

0 comments on commit a1798c3

Please sign in to comment.