Skip to content

Commit

Permalink
Change to device_allocate_and_pack to return unique_ptr
Browse files Browse the repository at this point in the history
The unique_ptr owns the allocation ensuring no leaks during
exception handling.

This also allows async_smart_free to be used to schedule
asynchronous deallocation of USM temporaries.
  • Loading branch information
oleksandr-pavlyk committed Jan 9, 2025
1 parent 9841f9e commit 9d77faf
Show file tree
Hide file tree
Showing 21 changed files with 392 additions and 745 deletions.
22 changes: 13 additions & 9 deletions dpctl/tensor/libtensor/include/utils/offset_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@

#include <algorithm>
#include <cstddef>
#include <sycl/sycl.hpp>
#include <memory> // for std::make_shared, std::unique_ptr
#include <tuple>
#include <utility> // for std::move, std::forward
#include <vector>

#include <sycl/sycl.hpp>

#include "kernels/dpctl_tensor_types.hpp"
#include "utils/strided_iters.hpp"
#include "utils/sycl_alloc_utils.hpp"
Expand Down Expand Up @@ -84,7 +87,9 @@ std::vector<T, A> concat(std::vector<T, A> lhs, Vs &&...vs)
} // namespace detail

template <typename indT, typename... Vs>
std::tuple<indT *, std::size_t, sycl::event>
std::tuple<std::unique_ptr<indT, dpctl::tensor::alloc_utils::USMDeleter>,
std::size_t,
sycl::event>
device_allocate_and_pack(sycl::queue &q,
std::vector<sycl::event> &host_task_events,
Vs &&...vs)
Expand All @@ -105,25 +110,24 @@ device_allocate_and_pack(sycl::queue &q,
std::make_shared<shT>(std::move(packed_shape_strides));

auto sz = packed_shape_strides_owner->size();
indT *shape_strides = sycl::malloc_device<indT>(sz, q);

if (shape_strides == nullptr) {
return std::make_tuple(shape_strides, 0, sycl::event());
}
auto shape_strides_owner =
dpctl::tensor::alloc_utils::smart_malloc_device<indT>(sz, q);
indT *shape_strides = shape_strides_owner.get();

sycl::event copy_ev =
q.copy<indT>(packed_shape_strides_owner->data(), shape_strides, sz);

sycl::event cleanup_host_task_ev = q.submit([&](sycl::handler &cgh) {
cgh.depends_on(copy_ev);
cgh.host_task([packed_shape_strides_owner] {
cgh.host_task([packed_shape_strides_owner =
std::move(packed_shape_strides_owner)] {
// increment shared pointer ref-count to keep it alive
// till copy operation completes;
});
});
host_task_events.push_back(cleanup_host_task_ev);

return std::make_tuple(shape_strides, sz, copy_ev);
return std::make_tuple(std::move(shape_strides_owner), sz, copy_ev);
}

struct NoOpIndexer
Expand Down
35 changes: 15 additions & 20 deletions dpctl/tensor/libtensor/source/accumulators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,11 @@ std::size_t py_mask_positions(const dpctl::tensor::usm_ndarray &mask,
: mask_positions_strided_i64_dispatch_vector[mask_typeid];

using dpctl::tensor::offset_utils::device_allocate_and_pack;
const auto &ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
auto ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
exec_q, host_task_events, compact_shape, compact_strides);
py::ssize_t *shape_strides = std::get<0>(ptr_size_event_tuple);
if (shape_strides == nullptr) {
sycl::event::wait(host_task_events);
throw std::runtime_error("Unexpected error");
}
auto shape_strides_owner = std::move(std::get<0>(ptr_size_event_tuple));
sycl::event copy_shape_ev = std::get<2>(ptr_size_event_tuple);
const py::ssize_t *shape_strides = shape_strides_owner.get();

if (2 * static_cast<std::size_t>(nd) != std::get<1>(ptr_size_event_tuple)) {
{
Expand All @@ -212,8 +209,8 @@ std::size_t py_mask_positions(const dpctl::tensor::usm_ndarray &mask,
copy_shape_ev.wait();
sycl::event::wait(host_task_events);

using dpctl::tensor::alloc_utils::sycl_free_noexcept;
sycl_free_noexcept(shape_strides, exec_q);
// ensure deleter of smart pointer is invoked with GIL released
shape_strides_owner.release();
}
throw std::runtime_error("Unexpected error");
}
Expand All @@ -233,8 +230,8 @@ std::size_t py_mask_positions(const dpctl::tensor::usm_ndarray &mask,
cumsum_data, host_task_events, dependent_events);

sycl::event::wait(host_task_events);
using dpctl::tensor::alloc_utils::sycl_free_noexcept;
sycl_free_noexcept(shape_strides, exec_q);
// ensure deleter of smart pointer is invoked with GIL released
shape_strides_owner.release();
}

return total_set;
Expand Down Expand Up @@ -356,24 +353,22 @@ std::size_t py_cumsum_1d(const dpctl::tensor::usm_ndarray &src,
}

using dpctl::tensor::offset_utils::device_allocate_and_pack;
const auto &ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
auto ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
exec_q, host_task_events, compact_shape, compact_strides);
py::ssize_t *shape_strides = std::get<0>(ptr_size_event_tuple);
if (shape_strides == nullptr) {
sycl::event::wait(host_task_events);
throw std::runtime_error("Unexpected error");
}
auto shape_strides_owner = std::move(std::get<0>(ptr_size_event_tuple));
sycl::event copy_shape_ev = std::get<2>(ptr_size_event_tuple);
const py::ssize_t *shape_strides = shape_strides_owner.get();

if (2 * static_cast<std::size_t>(nd) != std::get<1>(ptr_size_event_tuple)) {
{
py::gil_scoped_release release;

copy_shape_ev.wait();
sycl::event::wait(host_task_events);

// ensure USM deleter is called with GIL released
shape_strides_owner.release();
}
using dpctl::tensor::alloc_utils::sycl_free_noexcept;
sycl_free_noexcept(shape_strides, exec_q);
throw std::runtime_error("Unexpected error");
}

Expand All @@ -391,8 +386,8 @@ std::size_t py_cumsum_1d(const dpctl::tensor::usm_ndarray &src,
py::gil_scoped_release release;
sycl::event::wait(host_task_events);

using dpctl::tensor::alloc_utils::sycl_free_noexcept;
sycl_free_noexcept(shape_strides, exec_q);
// ensure USM deleter is called with GIL released
shape_strides_owner.release();
}

return total;
Expand Down
48 changes: 18 additions & 30 deletions dpctl/tensor/libtensor/source/accumulators/accumulate_over_axis.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,18 @@ py_accumulate_over_axis(const dpctl::tensor::usm_ndarray &src,
}

using dpctl::tensor::offset_utils::device_allocate_and_pack;
const auto &ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
auto ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
exec_q, host_task_events, simplified_iter_shape,
simplified_iter_src_strides, simplified_iter_dst_strides, acc_shape,
acc_src_strides, acc_dst_strides);
py::ssize_t *packed_shapes_and_strides = std::get<0>(ptr_size_event_tuple);
if (packed_shapes_and_strides == nullptr) {
throw std::runtime_error("Unexpected error");
}
auto packed_shapes_and_strides_owner =
std::move(std::get<0>(ptr_size_event_tuple));
const auto &copy_shapes_strides_ev = std::get<2>(ptr_size_event_tuple);
const py::ssize_t *packed_shapes_and_strides =
packed_shapes_and_strides_owner.get();

py::ssize_t *iter_shape_and_strides = packed_shapes_and_strides;
py::ssize_t *acc_shapes_and_strides =
const py::ssize_t *iter_shape_and_strides = packed_shapes_and_strides;
const py::ssize_t *acc_shapes_and_strides =
packed_shapes_and_strides + 3 * simplified_iter_shape.size();

std::vector<sycl::event> all_deps;
Expand All @@ -224,14 +224,8 @@ py_accumulate_over_axis(const dpctl::tensor::usm_ndarray &src,
iter_shape_and_strides, iter_src_offset, iter_dst_offset, acc_nd,
acc_shapes_and_strides, dst_data, host_task_events, all_deps);

sycl::event temp_cleanup_ev = exec_q.submit([&](sycl::handler &cgh) {
cgh.depends_on(acc_ev);
const auto &ctx = exec_q.get_context();
using dpctl::tensor::alloc_utils::sycl_free_noexcept;
cgh.host_task([ctx, packed_shapes_and_strides] {
sycl_free_noexcept(packed_shapes_and_strides, ctx);
});
});
sycl::event temp_cleanup_ev = dpctl::tensor::alloc_utils::async_smart_free(
exec_q, {acc_ev}, packed_shapes_and_strides_owner);
host_task_events.push_back(temp_cleanup_ev);

return std::make_pair(
Expand Down Expand Up @@ -384,18 +378,18 @@ std::pair<sycl::event, sycl::event> py_accumulate_final_axis_include_initial(
}

using dpctl::tensor::offset_utils::device_allocate_and_pack;
const auto &ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
auto ptr_size_event_tuple = device_allocate_and_pack<py::ssize_t>(
exec_q, host_task_events, simplified_iter_shape,
simplified_iter_src_strides, simplified_iter_dst_strides, acc_shape,
acc_src_strides, acc_dst_strides);
py::ssize_t *packed_shapes_and_strides = std::get<0>(ptr_size_event_tuple);
if (packed_shapes_and_strides == nullptr) {
throw std::runtime_error("Unexpected error");
}
auto packed_shapes_and_strides_owner =
std::move(std::get<0>(ptr_size_event_tuple));
const auto &copy_shapes_strides_ev = std::get<2>(ptr_size_event_tuple);
const py::ssize_t *packed_shapes_and_strides =
packed_shapes_and_strides_owner.get();

py::ssize_t *iter_shape_and_strides = packed_shapes_and_strides;
py::ssize_t *acc_shapes_and_strides =
const py::ssize_t *iter_shape_and_strides = packed_shapes_and_strides;
const py::ssize_t *acc_shapes_and_strides =
packed_shapes_and_strides + 3 * simplified_iter_shape.size();

std::vector<sycl::event> all_deps;
Expand All @@ -408,14 +402,8 @@ std::pair<sycl::event, sycl::event> py_accumulate_final_axis_include_initial(
iter_shape_and_strides, iter_src_offset, iter_dst_offset, acc_nd,
acc_shapes_and_strides, dst_data, host_task_events, all_deps);

sycl::event temp_cleanup_ev = exec_q.submit([&](sycl::handler &cgh) {
cgh.depends_on(acc_ev);
const auto &ctx = exec_q.get_context();
using dpctl::tensor::alloc_utils::sycl_free_noexcept;
cgh.host_task([ctx, packed_shapes_and_strides] {
sycl_free_noexcept(packed_shapes_and_strides, ctx);
});
});
sycl::event temp_cleanup_ev = dpctl::tensor::alloc_utils::async_smart_free(
exec_q, {acc_ev}, packed_shapes_and_strides_owner);
host_task_events.push_back(temp_cleanup_ev);

return std::make_pair(
Expand Down
Loading

0 comments on commit 9d77faf

Please sign in to comment.