Skip to content

Commit

Permalink
[PyOV] Fix hanging on infer request destruction (openvinotoolkit#24722)
Browse files Browse the repository at this point in the history
### Details:
- Initial problem: `test_custom_op` hanged on destruction because it was
waiting for a thread which tried to acquire GIL.
- The second problem is that pybind11 doesn't allow to work with GIL
besides of current scope and it's impossible to release GIL for
destructors. pybind/pybind11#1446
- Current solution allows to release GIL for InferRequest and all called
by chain destructors.

### Tickets:
 - CVS-141744
  • Loading branch information
akuporos authored Jun 7, 2024
1 parent bb179c6 commit ba5c45a
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 62 deletions.
22 changes: 13 additions & 9 deletions src/bindings/python/src/pyopenvino/core/async_infer_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "pyopenvino/core/common.hpp"
#include "pyopenvino/core/infer_request.hpp"
#include "pyopenvino/utils/utils.hpp"

namespace py = pybind11;

Expand Down Expand Up @@ -64,7 +65,7 @@ class AsyncInferQueue {
});
size_t idle_handle = m_idle_handles.front();
// wait for request to make sure it returned from callback
m_requests[idle_handle].m_request.wait();
m_requests[idle_handle].m_request->wait();
if (m_errors.size() > 0)
throw m_errors.front();
return idle_handle;
Expand All @@ -75,7 +76,7 @@ class AsyncInferQueue {
// release GIL to avoid deadlock on python callback
py::gil_scoped_release release;
for (auto&& request : m_requests) {
request.m_request.wait();
request.m_request->wait();
}
// acquire the mutex to access m_errors
std::lock_guard<std::mutex> lock(m_mutex);
Expand All @@ -87,7 +88,7 @@ class AsyncInferQueue {
for (size_t handle = 0; handle < m_requests.size(); handle++) {
// auto end_time = m_requests[handle].m_end_time; // TODO: pass it bellow? like in InferRequestWrapper

m_requests[handle].m_request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) {
m_requests[handle].m_request->set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) {
*m_requests[handle].m_end_time = Time::now();
{
// acquire the mutex to access m_idle_handles
Expand All @@ -110,14 +111,17 @@ class AsyncInferQueue {
}

void set_custom_callbacks(py::function f_callback) {
// need to acquire GIL before py::function deletion
auto callback_sp = Common::utils::wrap_pyfunction(std::move(f_callback));

for (size_t handle = 0; handle < m_requests.size(); handle++) {
m_requests[handle].m_request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) {
m_requests[handle].m_request->set_callback([this, callback_sp, handle](std::exception_ptr exception_ptr) {
*m_requests[handle].m_end_time = Time::now();
if (exception_ptr == nullptr) {
// Acquire GIL, execute Python function
py::gil_scoped_acquire acquire;
try {
f_callback(m_requests[handle], m_user_ids[handle]);
(*callback_sp)(m_requests[handle], m_user_ids[handle]);
} catch (const py::error_already_set& py_error) {
// This should behave the same as assert(!PyErr_Occurred())
// since constructor for pybind11's error_already_set is
Expand Down Expand Up @@ -193,13 +197,13 @@ void regclass_AsyncInferQueue(py::module m) {
// Set new inputs label/id from user
self.m_user_ids[handle] = userdata;
// Update inputs if there are any
self.m_requests[handle].m_request.set_input_tensor(inputs);
self.m_requests[handle].m_request->set_input_tensor(inputs);
// Now GIL can be released - we are NOT working with Python objects in this block
{
py::gil_scoped_release release;
*self.m_requests[handle].m_start_time = Time::now();
// Start InferRequest in asynchronus mode
self.m_requests[handle].m_request.start_async();
self.m_requests[handle].m_request->start_async();
}
},
py::arg("inputs"),
Expand Down Expand Up @@ -239,13 +243,13 @@ void regclass_AsyncInferQueue(py::module m) {
// Set new inputs label/id from user
self.m_user_ids[handle] = userdata;
// Update inputs if there are any
Common::set_request_tensors(self.m_requests[handle].m_request, inputs);
Common::set_request_tensors(*self.m_requests[handle].m_request, inputs);
// Now GIL can be released - we are NOT working with Python objects in this block
{
py::gil_scoped_release release;
*self.m_requests[handle].m_start_time = Time::now();
// Start InferRequest in asynchronus mode
self.m_requests[handle].m_request.start_async();
self.m_requests[handle].m_request->start_async();
}
},
py::arg("inputs"),
Expand Down
8 changes: 6 additions & 2 deletions src/bindings/python/src/pyopenvino/core/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,14 @@ ov::op::v0::Constant create_shared(py::array& array) {
// If ndim is equal to 0, creates scalar Constant.
// If size is equal to 0, creates empty Constant.
if (array_helpers::is_contiguous(array)) {
auto memory = std::make_shared<ov::SharedBuffer<py::array>>(
auto buffer = new ov::SharedBuffer<py::array>(
static_cast<char*>((array.ndim() == 0 || array.size() == 0) ? array.mutable_data() : array.mutable_data(0)),
array.ndim() == 0 ? array.itemsize() : array.nbytes(),
array);
std::shared_ptr<ov::SharedBuffer<py::array>> memory(buffer, [](ov::SharedBuffer<py::array>* buffer) {
py::gil_scoped_acquire acquire;
delete buffer;
});
return ov::op::v0::Constant(type_helpers::get_ov_type(array), array_helpers::get_shape(array), memory);
}
// If passed array is not C-style, throw an error.
Expand Down Expand Up @@ -614,7 +618,7 @@ uint32_t get_optimal_number_of_requests(const ov::CompiledModel& actual) {
py::dict outputs_to_dict(InferRequestWrapper& request, bool share_outputs, bool decode_strings) {
py::dict res;
for (const auto& out : request.m_outputs) {
auto t = request.m_request.get_tensor(out);
auto t = request.m_request->get_tensor(out);
if (t.get_element_type() == ov::element::string) {
if (share_outputs) {
PyErr_WarnEx(PyExc_RuntimeWarning, "Result of a string type will be copied to OVDict!", 1);
Expand Down
Loading

0 comments on commit ba5c45a

Please sign in to comment.