Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additive read distributed #1650

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions common/cuda_hip/distributed/matrix_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

#include "common/cuda_hip/base/thrust.hpp"
#include "common/cuda_hip/components/atomic.hpp"
#include "common/unified/base/kernel_launch.hpp"
#include "core/components/format_conversion_kernels.hpp"
#include "core/components/prefix_sum_kernels.hpp"


namespace gko {
Expand Down Expand Up @@ -49,6 +52,114 @@ struct input_type {
};


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
void count_non_owning_entries(
std::shared_ptr<const DefaultExecutor> exec,
const device_matrix_data<ValueType, GlobalIndexType>& input,
const experimental::distributed::Partition<LocalIndexType, GlobalIndexType>*
row_partition,
comm_index_type local_part, array<comm_index_type>& send_count,
array<GlobalIndexType>& send_positions,
array<GlobalIndexType>& original_positions)
{
auto row_part_ids = row_partition->get_part_ids();
const auto* row_range_bounds = row_partition->get_range_bounds();
const auto* row_range_starting_indices =
row_partition->get_range_starting_indices();
const auto num_row_ranges = row_partition->get_num_ranges();
const auto num_input_elements = input.get_num_stored_elements();

auto policy = thrust_policy(exec);

// precompute the row and column range id of each input element
auto input_row_idxs = input.get_const_row_idxs();
array<size_type> row_range_ids{exec, num_input_elements};
thrust::upper_bound(policy, row_range_bounds + 1,
row_range_bounds + num_row_ranges + 1, input_row_idxs,
input_row_idxs + num_input_elements,
row_range_ids.get_data());

array<comm_index_type> row_part_ids_per_entry{exec, num_input_elements};
run_kernel(
exec,
[] GKO_KERNEL(auto i, auto part_id, auto part_ids, auto range_ids,
auto part_ids_per_entry, auto orig_positions) {
part_ids_per_entry[i] = part_ids[range_ids[i]];
orig_positions[i] = part_ids_per_entry[i] == part_id ? -1 : i;
},
num_input_elements, local_part, row_part_ids, row_range_ids.get_data(),
row_part_ids_per_entry.get_data(), original_positions.get_data());

thrust::stable_sort_by_key(
policy, row_part_ids_per_entry.get_data(),
row_part_ids_per_entry.get_data() + num_input_elements,
original_positions.get_data());
run_kernel(
exec,
[] GKO_KERNEL(auto i, auto orig_positions, auto s_positions) {
s_positions[i] = orig_positions[i] >= 0 ? 1 : 0;
},
num_input_elements, original_positions.get_const_data(),
send_positions.get_data());

components::prefix_sum_nonnegative(exec, send_positions.get_data(),
num_input_elements);
size_type num_parts = row_partition->get_num_parts();
array<comm_index_type> row_part_ptrs{exec, num_parts + 1};
row_part_ptrs.fill(0);
components::convert_idxs_to_ptrs(
exec, row_part_ids_per_entry.get_const_data(), num_input_elements,
num_parts, row_part_ptrs.get_data());

run_kernel(
exec,
[] GKO_KERNEL(auto i, auto part_id, auto part_ptrs, auto count) {
count[i] = i == part_id ? 0 : part_ptrs[i + 1] - part_ptrs[i];
},
num_parts, local_part, row_part_ptrs.get_data(), send_count.get_data());
}

GKO_INSTANTIATE_FOR_EACH_VALUE_AND_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_COUNT_NON_OWNING_ENTRIES);


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
void fill_send_buffers(
std::shared_ptr<const DefaultExecutor> exec,
const device_matrix_data<ValueType, GlobalIndexType>& input,
const experimental::distributed::Partition<LocalIndexType, GlobalIndexType>*
row_partition,
comm_index_type local_part, const array<GlobalIndexType>& send_positions,
const array<GlobalIndexType>& original_positions,
array<GlobalIndexType>& send_row_idxs,
array<GlobalIndexType>& send_col_idxs, array<ValueType>& send_values)
{
auto num_entries = input.get_num_stored_elements();
auto input_row_idxs = input.get_const_row_idxs();
auto input_col_idxs = input.get_const_col_idxs();
auto input_values = input.get_const_values();

run_kernel(
exec,
[] GKO_KERNEL(auto i, auto in_rows, auto in_cols, auto in_vals,
auto in_pos, auto out_pos, auto out_rows, auto out_cols,
auto out_vals) {
if (in_pos[i] >= 0) {
out_rows[out_pos[i]] = in_rows[in_pos[i]];
out_cols[out_pos[i]] = in_cols[in_pos[i]];
out_vals[out_pos[i]] = in_vals[in_pos[i]];
}
},
num_entries, input_row_idxs, input_col_idxs, input_values,
original_positions.get_const_data(), send_positions.get_const_data(),
send_row_idxs.get_data(), send_col_idxs.get_data(),
send_values.get_data());
}

GKO_INSTANTIATE_FOR_EACH_VALUE_AND_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_FILL_SEND_BUFFERS);


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
void separate_local_nonlocal(
std::shared_ptr<const DefaultExecutor> exec,
Expand Down
3 changes: 3 additions & 0 deletions core/device_hooks/common_kernels.inc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ GKO_STUB_VALUE_AND_LOCAL_GLOBAL_INDEX_TYPE(
namespace distributed_matrix {


GKO_STUB_VALUE_AND_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_COUNT_NON_OWNING_ENTRIES);
GKO_STUB_VALUE_AND_LOCAL_GLOBAL_INDEX_TYPE(GKO_DECLARE_FILL_SEND_BUFFERS);
GKO_STUB_VALUE_AND_LOCAL_GLOBAL_INDEX_TYPE(GKO_DECLARE_SEPARATE_LOCAL_NONLOCAL);


Expand Down
119 changes: 108 additions & 11 deletions core/distributed/matrix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ginkgo/core/matrix/csr.hpp>
#include <ginkgo/core/matrix/diagonal.hpp>

#include "core/components/prefix_sum_kernels.hpp"
#include "core/distributed/matrix_kernels.hpp"


Expand All @@ -20,6 +21,10 @@ namespace matrix {
namespace {


GKO_REGISTER_OPERATION(count_non_owning_entries,
distributed_matrix::count_non_owning_entries);
GKO_REGISTER_OPERATION(fill_send_buffers,
distributed_matrix::fill_send_buffers);
GKO_REGISTER_OPERATION(separate_local_nonlocal,
distributed_matrix::separate_local_nonlocal);

Expand Down Expand Up @@ -243,7 +248,8 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
std::shared_ptr<const Partition<local_index_type, global_index_type>>
row_partition,
std::shared_ptr<const Partition<local_index_type, global_index_type>>
col_partition)
col_partition,
assembly_mode assembly_type)
{
const auto comm = this->get_communicator();
GKO_ASSERT_EQ(data.get_size()[0], row_partition->get_size());
Expand All @@ -252,14 +258,103 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
GKO_ASSERT_EQ(comm.size(), col_partition->get_num_parts());
auto exec = this->get_executor();
auto local_part = comm.rank();
auto use_host_buffer = mpi::requires_host_buffer(exec, comm);
auto tmp_row_partition = make_temporary_clone(exec, row_partition);
auto tmp_col_partition = make_temporary_clone(exec, col_partition);

// set up LinOp sizes
auto num_parts = static_cast<size_type>(row_partition->get_num_parts());
auto global_num_rows = row_partition->get_size();
auto global_num_cols = col_partition->get_size();
dim<2> global_dim{global_num_rows, global_num_cols};
this->set_size(global_dim);

device_matrix_data<value_type, global_index_type> all_data{exec};
if (assembly_type == assembly_mode::communicate) {
size_type num_entries = data.get_num_stored_elements();
size_type num_parts = comm.size();
array<comm_index_type> send_sizes{exec, num_parts};
array<global_index_type> send_positions{exec, num_entries};
array<global_index_type> original_positions{exec, num_entries};
send_sizes.fill(0);
exec->run(matrix::make_count_non_owning_entries(
data, tmp_row_partition.get(), local_part, send_sizes,
send_positions, original_positions));

send_sizes.set_executor(exec->get_master());
array<comm_index_type> send_offsets{exec->get_master(), num_parts + 1};
array<comm_index_type> recv_sizes{exec->get_master(), num_parts};
array<comm_index_type> recv_offsets{exec->get_master(), num_parts + 1};

std::partial_sum(send_sizes.get_data(),
send_sizes.get_data() + num_parts,
send_offsets.get_data() + 1);
comm.all_to_all(exec, send_sizes.get_data(), 1, recv_sizes.get_data(),
1);
std::partial_sum(recv_sizes.get_data(),
recv_sizes.get_data() + num_parts,
recv_offsets.get_data() + 1);
send_offsets.get_data()[0] = 0;
recv_offsets.get_data()[0] = 0;

size_type n_send = send_offsets.get_data()[num_parts];
size_type n_recv = recv_offsets.get_data()[num_parts];
array<global_index_type> send_row_idxs{exec, n_send};
array<global_index_type> send_col_idxs{exec, n_send};
array<value_type> send_values{exec, n_send};
array<global_index_type> recv_row_idxs{exec, n_recv};
array<global_index_type> recv_col_idxs{exec, n_recv};
array<value_type> recv_values{exec, n_recv};
exec->run(matrix::make_fill_send_buffers(
data, tmp_row_partition.get(), local_part, send_positions,
original_positions, send_row_idxs, send_col_idxs, send_values));

if (use_host_buffer) {
send_row_idxs.set_executor(exec->get_master());
send_col_idxs.set_executor(exec->get_master());
send_values.set_executor(exec->get_master());
recv_row_idxs.set_executor(exec->get_master());
recv_col_idxs.set_executor(exec->get_master());
recv_values.set_executor(exec->get_master());
}
comm.all_to_all_v(use_host_buffer ? exec : exec->get_master(),
send_row_idxs.get_const_data(), send_sizes.get_data(),
send_offsets.get_data(), recv_row_idxs.get_data(),
recv_sizes.get_data(), recv_offsets.get_data());
comm.all_to_all_v(use_host_buffer ? exec : exec->get_master(),
send_col_idxs.get_const_data(), send_sizes.get_data(),
send_offsets.get_data(), recv_col_idxs.get_data(),
recv_sizes.get_data(), recv_offsets.get_data());
comm.all_to_all_v(use_host_buffer ? exec : exec->get_master(),
send_values.get_const_data(), send_sizes.get_data(),
send_offsets.get_data(), recv_values.get_data(),
recv_sizes.get_data(), recv_offsets.get_data());
if (use_host_buffer) {
recv_row_idxs.set_executor(exec);
recv_col_idxs.set_executor(exec);
recv_values.set_executor(exec);
}

array<global_index_type> all_row_idxs{exec, num_entries + n_recv};
MarcelKoch marked this conversation as resolved.
Show resolved Hide resolved
array<global_index_type> all_col_idxs{exec, num_entries + n_recv};
array<value_type> all_values{exec, num_entries + n_recv};
exec->copy_from(exec, num_entries, data.get_const_row_idxs(),
all_row_idxs.get_data());
MarcelKoch marked this conversation as resolved.
Show resolved Hide resolved
exec->copy_from(exec, n_recv, recv_row_idxs.get_data(),
all_row_idxs.get_data() + num_entries);
exec->copy_from(exec, num_entries, data.get_const_col_idxs(),
all_col_idxs.get_data());
exec->copy_from(exec, n_recv, recv_col_idxs.get_data(),
all_col_idxs.get_data() + num_entries);
exec->copy_from(exec, num_entries, data.get_const_values(),
all_values.get_data());
exec->copy_from(exec, n_recv, recv_values.get_data(),
all_values.get_data() + num_entries);
all_data = device_matrix_data<value_type, global_index_type>{
exec, global_dim, std::move(all_row_idxs), std::move(all_col_idxs),
std::move(all_values)};
all_data.sum_duplicates();
}

// temporary storage for the output
array<local_index_type> local_row_idxs{exec};
array<local_index_type> local_col_idxs{exec};
Expand All @@ -273,8 +368,8 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
// as well as the rows of the non-local block. The columns of the non-local
// block are still in global indices.
exec->run(matrix::make_separate_local_nonlocal(
data, make_temporary_clone(exec, row_partition).get(),
make_temporary_clone(exec, col_partition).get(), local_part,
assembly_type == assembly_mode::communicate ? all_data : data,
tmp_row_partition.get(), tmp_col_partition.get(), local_part,
local_row_idxs, local_col_idxs, local_values, non_local_row_idxs,
global_non_local_col_idxs, non_local_values));

Expand Down Expand Up @@ -335,7 +430,6 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
imap.get_executor(), imap.get_non_local_size(),
imap.get_remote_local_idxs().get_const_flat_data())
.copy_to_array();
auto use_host_buffer = mpi::requires_host_buffer(exec, comm);
if (use_host_buffer) {
recv_gather_idxs.set_executor(exec->get_master());
gather_idxs_.clear();
Expand All @@ -358,35 +452,38 @@ void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
std::shared_ptr<const Partition<local_index_type, global_index_type>>
row_partition,
std::shared_ptr<const Partition<local_index_type, global_index_type>>
col_partition)
col_partition,
assembly_mode assembly_type)
{
return this->read_distributed(
device_matrix_data<value_type, global_index_type>::create_from_host(
this->get_executor(), data),
row_partition, col_partition);
row_partition, col_partition, assembly_type);
}


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
const matrix_data<ValueType, global_index_type>& data,
std::shared_ptr<const Partition<local_index_type, global_index_type>>
partition)
partition,
assembly_mode assembly_type)
{
return this->read_distributed(
device_matrix_data<value_type, global_index_type>::create_from_host(
this->get_executor(), data),
partition, partition);
partition, partition, assembly_type);
}


template <typename ValueType, typename LocalIndexType, typename GlobalIndexType>
void Matrix<ValueType, LocalIndexType, GlobalIndexType>::read_distributed(
const device_matrix_data<ValueType, GlobalIndexType>& data,
std::shared_ptr<const Partition<local_index_type, global_index_type>>
partition)
partition,
assembly_mode assembly_type)
{
return this->read_distributed(data, partition, partition);
return this->read_distributed(data, partition, partition, assembly_type);
}


Expand Down
43 changes: 38 additions & 5 deletions core/distributed/matrix_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,32 @@ namespace gko {
namespace kernels {


#define GKO_DECLARE_COUNT_NON_OWNING_ENTRIES(ValueType, LocalIndexType, \
GlobalIndexType) \
void count_non_owning_entries( \
std::shared_ptr<const DefaultExecutor> exec, \
const device_matrix_data<ValueType, GlobalIndexType>& input, \
const experimental::distributed::Partition< \
LocalIndexType, GlobalIndexType>* row_partition, \
comm_index_type local_part, array<comm_index_type>& send_count, \
array<GlobalIndexType>& send_positions, \
array<GlobalIndexType>& original_positions)


#define GKO_DECLARE_FILL_SEND_BUFFERS(ValueType, LocalIndexType, \
GlobalIndexType) \
void fill_send_buffers( \
std::shared_ptr<const DefaultExecutor> exec, \
const device_matrix_data<ValueType, GlobalIndexType>& input, \
const experimental::distributed::Partition< \
LocalIndexType, GlobalIndexType>* row_partition, \
comm_index_type local_part, \
const array<GlobalIndexType>& send_positions, \
const array<GlobalIndexType>& original_positions, \
array<GlobalIndexType>& send_row_idxs, \
array<GlobalIndexType>& send_col_idxs, array<ValueType>& send_values)


#define GKO_DECLARE_SEPARATE_LOCAL_NONLOCAL(ValueType, LocalIndexType, \
GlobalIndexType) \
void separate_local_nonlocal( \
Expand All @@ -35,11 +61,18 @@ namespace kernels {
array<ValueType>& non_local_values)


#define GKO_DECLARE_ALL_AS_TEMPLATES \
using comm_index_type = experimental::distributed::comm_index_type; \
template <typename ValueType, typename LocalIndexType, \
typename GlobalIndexType> \
GKO_DECLARE_SEPARATE_LOCAL_NONLOCAL(ValueType, LocalIndexType, \
#define GKO_DECLARE_ALL_AS_TEMPLATES \
using comm_index_type = experimental::distributed::comm_index_type; \
template <typename ValueType, typename LocalIndexType, \
typename GlobalIndexType> \
GKO_DECLARE_COUNT_NON_OWNING_ENTRIES(ValueType, LocalIndexType, \
GlobalIndexType); \
template <typename ValueType, typename LocalIndexType, \
typename GlobalIndexType> \
GKO_DECLARE_FILL_SEND_BUFFERS(ValueType, LocalIndexType, GlobalIndexType); \
template <typename ValueType, typename LocalIndexType, \
typename GlobalIndexType> \
GKO_DECLARE_SEPARATE_LOCAL_NONLOCAL(ValueType, LocalIndexType, \
GlobalIndexType)


Expand Down
Loading
Loading