Skip to content

Commit

Permalink
NPUW: Unfold infer requests (openvinotoolkit#27319)
Browse files Browse the repository at this point in the history
### Details:
 - *item1*
 - *...*

### Tickets:
 - E-140517
  • Loading branch information
dmatveev authored and smirnov-alexey committed Nov 29, 2024
1 parent 9aabccb commit 3d8b5eb
Show file tree
Hide file tree
Showing 11 changed files with 621 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ DEFINE_OPT(NPUW_WEIGHTS_BANK, std::string, "", npuw::weights_bank, CompileTime);
DEFINE_OPT(NPUW_WEIGHTS_BANK_ALLOC, std::string, "", npuw::weights_bank_alloc, CompileTime);
DEFINE_OPT(NPUW_CACHE_DIR, std::string, "", npuw::cache_dir, CompileTime);
DEFINE_OPT(NPUW_FUNCALL_ASYNC, bool, false, npuw::funcall_async, RunTime);
DEFINE_OPT(NPUW_UNFOLD_IREQS, bool, false, npuw::unfold_ireqs, RunTime);
DEFINE_OPT(NPUW_ACC_CHECK, bool, false, npuw::accuracy::check, RunTime);
DEFINE_OPT(NPUW_ACC_THRESH, double, 0.01, npuw::accuracy::threshold, RunTime);
DEFINE_OPT(NPUW_ACC_DEVICE, std::string, "", npuw::accuracy::reference_device, RunTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ static constexpr ov::Property<bool> parallel_compilation{"NPUW_PARALLEL_COMPILE"
*/
static constexpr ov::Property<bool> funcall_async{"NPUW_FUNCALL_ASYNC"};

/**
* @brief
* Type: boolean
* Create individual infer requests for partitiongs, even repeating.
* Default value: false.
*/
static constexpr ov::Property<bool> unfold_ireqs{"NPUW_UNFOLD_IREQS"};

namespace accuracy {
/**
* @brief
Expand Down
1 change: 1 addition & 0 deletions src/plugins/intel_npu/src/al/src/config/npuw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void intel_npu::registerNPUWOptions(OptionsDesc& desc) {
desc.add<NPUW_FUNCALL_FOR_ALL>();
desc.add<NPUW_PARALLEL_COMPILE>();
desc.add<NPUW_FUNCALL_ASYNC>();
desc.add<NPUW_UNFOLD_IREQS>();
desc.add<NPUW_WEIGHTS_BANK>();
desc.add<NPUW_WEIGHTS_BANK_ALLOC>();
desc.add<NPUW_CACHE_DIR>();
Expand Down
291 changes: 285 additions & 6 deletions src/plugins/intel_npu/src/plugin/npuw/base_sync_infer_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "compiled_model.hpp"
#include "intel_npu/config/npuw.hpp"
#include "logging.hpp"
#include "openvino/core/parallel.hpp"
#include "util.hpp"

ov::npuw::IBaseInferRequest::IBaseInferRequest(const std::shared_ptr<ov::npuw::CompiledModel>& compiled_model)
Expand Down Expand Up @@ -58,12 +59,8 @@ ov::npuw::IBaseInferRequest::RqPtrs ov::npuw::IBaseInferRequest::create_infer_re
LOG_INFO("- Trying next device...");
comp_model_desc.device_it++;
can_try_again = m_npuw_model->compile_for_success(id);
if (can_try_again) {
if (recompiled)
*recompiled = true;
// Probably shouldn't be called all the time, but only if
// I/O submodel is affected
m_npuw_model->reset_io();
if (can_try_again && recompiled) {
*recompiled = true;
}
}
} // while(!new_ireq && can_try_again)
Expand Down Expand Up @@ -178,6 +175,33 @@ void ov::npuw::IBaseInferRequest::check_tensors() const {
return;
}

std::vector<ov::SoPtr<ov::IVariableState>> ov::npuw::IBaseInferRequest::query_state() const {
std::vector<ov::SoPtr<ov::IVariableState>> variable_states = {};
for (const auto& request : m_subrequests) {
if (!request) // optimized out
continue;
for (auto&& state : request->query_state()) {
if (!state._so)
state._so = request._so;
variable_states.emplace_back(state);
}
}
return variable_states;
}

std::vector<ov::ProfilingInfo> ov::npuw::IBaseInferRequest::get_profiling_info() const {
std::vector<ov::ProfilingInfo> info;
for (size_t i = 0; i < m_subrequests.size(); ++i) {
if (!m_subrequests[i]) // optimized out
continue;
auto&& subreq_info = m_subrequests[i]->get_profiling_info();
for (auto&& rec : subreq_info)
rec.node_name = std::string("subgraph") + std::to_string(i) + ": " + rec.node_name;
info.insert(info.end(), subreq_info.begin(), subreq_info.end());
}
return info;
}

void ov::npuw::IBaseInferRequest::infer() {
m_now_idx.reset();
prepare_for_infer();
Expand Down Expand Up @@ -209,6 +233,261 @@ void ov::npuw::IBaseInferRequest::infer() {
m_now_idx.reset();
}

std::size_t ov::npuw::IBaseInferRequest::total_subrequests() const {
return m_subrequests.size();
}

ov::npuw::TensorPtr ov::npuw::IBaseInferRequest::allocMem(const ov::element::Type type,
const ov::Shape& shape,
const std::string& device) {
if (device == "CPU" || ov::shape_size(shape) == 0) {
return ov::get_tensor_impl(ov::Tensor(type, shape));
}

auto remote_ctx = m_npuw_model->get_plugin()->get_core()->get_default_context(device)._ptr;
auto remote_tensor = remote_ctx->create_host_tensor(type, shape);
return ov::get_tensor_impl(ov::make_tensor(remote_tensor));
}

ov::npuw::TensorPtr ov::npuw::IBaseInferRequest::allocOut(const ov::Output<const ov::Node>& node,
const std::string& device) {
return allocMem(node.get_element_type(), node.get_shape(), device);
}

void ov::npuw::IBaseInferRequest::alloc_io() {
// Preallocate input tensors
LOG_INFO("Preallocating input tensors...");
for (size_t i = 0; i < m_npuw_model->inputs().size(); i++) {
const auto& port = m_npuw_model->inputs()[i];
ov::SoPtr<ov::ITensor> allocated = allocOut(port, m_npuw_model->global_mem_device());
m_input_tensors.push_back(allocated);
m_input_allocated.insert(allocated->data());
m_port_to_tensor[port] = TensorStorage{m_input_tensors.back(), true};
} // for(inputs)

// Preallocate output tensors
LOG_INFO("Preallocating output tensors...");
for (size_t i = 0; i < m_npuw_model->outputs().size(); i++) {
LOG_BLOCK();
const auto& port = m_npuw_model->outputs()[i];
LOG_INFO("Output " << i << " of " << m_npuw_model->outputs().size() << ": " << port);

// FIXME: Yes, the CompiledModel::ToSubmodel == JustInferRequest::LinkFrom
const auto& from_submodel = m_npuw_model->m_outputs_to_submodels_outputs.at(i);
LOG_INFO("Produced by Subgraph[" << from_submodel.first << "] / " << from_submodel.second);

auto tensor = alloc_global_out(i);
m_output_tensors.push_back(tensor);
m_port_to_tensor[port] = TensorStorage{tensor, true};
}
}

ov::npuw::TensorPtr ov::npuw::IBaseInferRequest::alloc_global_out(std::size_t out_idx) {
const auto& port = m_npuw_model->outputs().at(out_idx);
return allocOut(port, m_npuw_model->global_mem_device());
}

void ov::npuw::IBaseInferRequest::init_gio() {
// Build the parameter/result mapping
m_subrequests_gio.resize(m_subrequests.size());

// Parameters: stage 1...
for (size_t i = 0; i < m_npuw_model->inputs().size(); i++) {
const auto& to_submodel = m_npuw_model->m_inputs_to_submodels_inputs.at(i);
if (to_submodel != CompiledModel::NO_LINK) {
std::size_t sub_idx{}, in_idx{};
std::tie(sub_idx, in_idx) = to_submodel;
m_subrequests_gio.at(sub_idx).global_params[i] = in_idx;
}
} // for(inputs)

// Parameters: stage 2...
for (auto&& it : m_npuw_model->m_param_subscribers) {
const auto param_idx = it.first;
for (auto&& to_submodel : it.second) {
std::size_t sub_idx{}, in_idx{};
std::tie(sub_idx, in_idx) = to_submodel;
m_subrequests_gio.at(sub_idx).global_params[param_idx] = in_idx;
}
}

// Results
for (size_t i = 0; i < m_npuw_model->outputs().size(); i++) {
std::size_t sub_idx{}, out_idx{};
std::tie(sub_idx, out_idx) = m_npuw_model->m_outputs_to_submodels_outputs.at(i);
m_subrequests_gio.at(sub_idx).global_results[i] = out_idx;
}
}

void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request) {
auto& comp_model_desc = m_npuw_model->m_compiled_submodels[idx];

NPUW_ASSERT(comp_model_desc.replaced_by);
const auto real_idx = comp_model_desc.replaced_by.value();
auto& func_desc = m_npuw_model->m_compiled_submodels[real_idx];

// Bind extra parameters from the function's closure
// First, do easy things & delay heavy stuff
std::vector<std::size_t> closure_unpack_required;
std::vector<std::size_t> closure_copy_required;

for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.size(); cidx++) {
auto& closure = comp_model_desc.closure[cidx];
const auto closure_param_id = comp_model_desc.param_base + cidx;

if (m_npuw_model->is_gather_closure(idx, cidx)) {
// No need to set/copy the host_gather's closure tensor int
// the subrequest - it is just a dummy. host_gather writes
// to the right buffer directly.
continue;
}

auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
if (m_npuw_model->unpack_required(idx, cidx)) {
// Remember where the unpack is required
closure_unpack_required.push_back(cidx);
} else {
if (needs_copy(idx, cidx)) {
// Remember where copy is requried
closure_copy_required.push_back(cidx);
} else {
// Easy case, just set one to another
request->set_tensor(iport, ov::get_tensor_impl(closure));
}
}
} // for(closure)

// m_ms_unpack += ov::npuw::perf::ms_to_run([&](){
ov::parallel_for(closure_copy_required.size(), [&](std::size_t j) {
auto cidx = closure_copy_required[j];
auto& closure = comp_model_desc.closure[cidx];
const auto closure_param_id = comp_model_desc.param_base + cidx;
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
auto clparam = request->get_tensor(iport);
ov::get_tensor_impl(closure)->copy_to(clparam._ptr);
});
// }); // ms_to_run

for (std::size_t j = 0; j != closure_unpack_required.size(); j++) {
// NB: No need to protect anything here as containers are all
// preallocated and we only access elements under particular (thread
// -local) indices.
auto cidx = closure_unpack_required[j];

// FIXME: zerops are stored with absolute indexing, this needs to be aligned
auto& closure = comp_model_desc.closure[cidx];

const auto closure_param_id = comp_model_desc.param_base + cidx;
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
auto clparam = request->get_tensor(iport);

if (!comp_model_desc.scales.empty() && comp_model_desc.scales[cidx] && comp_model_desc.zerops[cidx]) {
// Unpacking this weight requires scaling with zero points...
ov::npuw::util::unpack(ov::get_tensor_impl(closure),
ov::get_tensor_impl(comp_model_desc.zerops[cidx]),
ov::get_tensor_impl(comp_model_desc.scales[cidx]),
clparam);
} else if (!comp_model_desc.scales.empty() && comp_model_desc.scales[cidx]) {
// Unpacking this weight requires scaling
ov::npuw::util::unpack(ov::get_tensor_impl(closure),
ov::get_tensor_impl(comp_model_desc.scales[cidx]),
clparam);
} else {
// Unpacking this weight doesn't require scaling
ov::npuw::util::unpack(ov::get_tensor_impl(closure), clparam);
}
}
}

void ov::npuw::IBaseInferRequest::bind_global_params(std::size_t idx, RqPtr request) {
LOG_DEBUG("Binding parameters for Subgraph[" << idx << "]");
LOG_BLOCK();

auto& comp_model_desc = m_npuw_model->m_compiled_submodels[idx];
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);

const bool do_copy = needs_copy(idx);
const auto& iodesc = m_subrequests_gio.at(idx);

const auto& proto_comp_model_desc = m_npuw_model->m_compiled_submodels[real_idx];
const bool is_spatial = proto_comp_model_desc.spatial.has_value();

// a list of ports to copy tensors, if needed: FROM -> TO
std::vector<std::pair<ov::SoPtr<ov::ITensor>, ov::Output<const ov::Node>>> copy_list;

// Check if the given subgraph's input is spatial
auto is_spatial_param = [&](std::size_t sub_in_idx) -> bool {
if (!is_spatial) {
return false; // Early return
}
auto& spatial = proto_comp_model_desc.spatial.value();
return std::any_of(spatial.params.begin(), spatial.params.end(), [&](const auto& p) -> bool {
return p.idx == sub_in_idx;
});
};

for (auto&& it : iodesc.global_params) {
std::size_t param_idx{}, sub_in_idx{};
std::tie(param_idx, sub_in_idx) = it;
LOG_DEBUG("Processing " << param_idx << " -> " << sub_in_idx << std::endl);

const auto& g_port = m_npuw_model->inputs()[param_idx];
const auto& g_tnsr = m_port_to_tensor.at(g_port).tensor;
const auto& s_port = request->get_inputs()[sub_in_idx];
LOG_DEBUG("Processing " << g_port << " -> " << s_port << "...");
LOG_BLOCK();
if (!is_spatial_param(sub_in_idx)) {
// Input parameter is non-spatial, do normal handling
if (m_input_allocated.count(g_tnsr->data()) == 0 && do_copy) {
LOG_DEBUG("Will be copied");
copy_list.emplace_back(g_tnsr, s_port);
} else {
LOG_DEBUG("Will be set");
request->set_tensor(s_port, g_tnsr);
}
} else {
// Register for future use
m_spatial_io[real_idx].inputs.at(sub_in_idx) = g_tnsr;
}
}

LOG_DEBUG("Running copy...");
ov::parallel_for(copy_list.size(), [&](std::size_t idx) {
auto& it = copy_list[idx];
ov::SoPtr<ov::ITensor> dst = request->get_tensor(it.second);
it.first->copy_to(dst._ptr);
});

// Run host-side gather, if required
if (comp_model_desc.host_gather.dst_idx != -1) {
const auto& gport = comp_model_desc.compiled_model->inputs()[comp_model_desc.host_gather.dst_idx];
const auto gather = request->get_tensor(gport);

const auto& vocab = comp_model_desc.closure[comp_model_desc.host_gather.src_idx - comp_model_desc.param_base];
const auto& lport = comp_model_desc.compiled_model->inputs()[comp_model_desc.host_gather.idx_idx];
const auto lookup = request->get_tensor(lport);
ov::npuw::util::gather(ov::get_tensor_impl(vocab), lookup, gather);
}

LOG_DEBUG("Done");
}

void ov::npuw::IBaseInferRequest::bind_global_results(std::size_t idx, RqPtr request) {
LOG_DEBUG("Binding results for Subgraph[" << idx << "]");
LOG_BLOCK();

const auto& iodesc = m_subrequests_gio.at(idx);
for (auto&& it : iodesc.global_results) {
std::size_t result_idx{}, sub_out_idx{};
std::tie(result_idx, sub_out_idx) = it;
const auto& g_port = m_npuw_model->outputs()[result_idx];
const auto& s_port = request->get_outputs()[sub_out_idx];
request->set_tensor(s_port, m_port_to_tensor.at(g_port).tensor);
}

LOG_DEBUG("Done");
}

void ov::npuw::IBaseInferRequest::dump_input_tensors(std::size_t idx) {
const std::string dump_ios_opt = m_npuw_model->m_cfg.get<::intel_npu::NPUW_DUMP_IO>();
const std::size_t end_idx = m_npuw_model->m_compiled_submodels.size();
Expand Down
Loading

0 comments on commit 3d8b5eb

Please sign in to comment.