Skip to content

Commit

Permalink
Merge pull request #88 from ekg/speedwish
Browse files Browse the repository at this point in the history
Parallelize the mapping of collection of bases from the full sequence set into a dense range
  • Loading branch information
AndreaGuarracino authored Dec 8, 2021
2 parents 0efdd54 + beb7a38 commit ccfefb0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/alignments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ void paf_worker(
// Query/Target end (0-based; BED-like; open)
paf.target_start >= paf.target_sequence_length || paf.target_end > paf.target_sequence_length || paf.target_start >= paf.target_end) break;
size_t query_idx = seqidx.rank_of_seq_named(paf.query_sequence_name);
size_t query_len = seqidx.nth_seq_length(query_idx);
//size_t query_len = seqidx.nth_seq_length(query_idx);
size_t target_idx = seqidx.rank_of_seq_named(paf.target_sequence_name);
size_t target_len = seqidx.nth_seq_length(target_idx);
//size_t target_len = seqidx.nth_seq_length(target_idx);
bool q_rev = !paf.query_target_same_strand;
size_t q_all_pos = (q_rev ? seqidx.pos_in_all_seqs(query_idx, paf.query_end, false) - 1
: seqidx.pos_in_all_seqs(query_idx, paf.query_start, false));
Expand Down
6 changes: 3 additions & 3 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ int main(int argc, char** argv) {
seqidx.save();
if (args::get(show_progress)) std::cerr << "[seqwish::seqidx] " << std::fixed << std::showpoint << std::setprecision(3) << seconds_since(start_time) << " index built" << std::endl;

if (args::get(show_progress)) std::cerr << "[seqwish::alignments] " << std::fixed << std::showpoint << std::setprecision(3) << seconds_since(start_time) << " processing alignments" << std::endl;
// 2) parse the alignments into position pairs and index (A)
if (args::get(show_progress)) std::cerr << "[seqwish::alignments] " << std::fixed << std::showpoint << std::setprecision(3) << seconds_since(start_time) << " processing alignments" << std::endl;
std::string aln_idx = work_base + ".sqa";
std::remove(aln_idx.c_str());
auto aln_iitree_ptr = std::make_unique<mmmulti::iitree<uint64_t, pos_t>>(aln_idx);
Expand Down Expand Up @@ -169,7 +169,7 @@ int main(int argc, char** argv) {
size_t graph_length = compute_transitive_closures(seqidx, aln_iitree, seq_v_file, node_iitree, path_iitree,
args::get(repeat_max),
args::get(min_repeat_dist),
transclose_batch ? seqwish::handy_parameter(args::get(transclose_batch), 1000000) : 1000000,
transclose_batch ? (uint64_t)seqwish::handy_parameter(args::get(transclose_batch), 1000000) : 1000000,
args::get(show_progress),
num_threads,
start_time);
Expand Down Expand Up @@ -208,8 +208,8 @@ int main(int argc, char** argv) {
derive_links(seqidx, node_iitree, path_iitree, seq_id_cbv, seq_id_cbv_rank, seq_id_cbv_select, link_mmset, num_threads);
if (args::get(show_progress)) std::cerr << "[seqwish::links] " << std::fixed << std::showpoint << std::setprecision(3) << seconds_since(start_time) << " links derived" << std::endl;

if (args::get(show_progress)) std::cerr << "[seqwish::gfa] " << std::fixed << std::showpoint << std::setprecision(3) << seconds_since(start_time) << " writing graph" << std::endl;
// 6) emit the graph in GFA or VGP format
if (args::get(show_progress)) std::cerr << "[seqwish::gfa] " << std::fixed << std::showpoint << std::setprecision(3) << seconds_since(start_time) << " writing graph" << std::endl;
if (!args::get(gfa_out).empty()) {
std::ofstream out(args::get(gfa_out).c_str());
emit_gfa(out, graph_length, seq_v_file, node_iitree, path_iitree, seq_id_cbv, seq_id_cbv_rank, seq_id_cbv_select, seqidx, link_mmset, num_threads);
Expand Down
2 changes: 1 addition & 1 deletion src/seqindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void seqindex_t::build_index(const std::string& filename, const std::string& idx
bool notified_empty_seqs = false;
while (in.good()) {
line[0] = '>';
std::string seq_name = line.substr(0, line.find(" "));
std::string seq_name = line.substr(0, line.find(' '));

std::string seq;
// get the sequence
Expand Down
107 changes: 77 additions & 30 deletions src/transclosure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void flush_range(std::map<pos_t, range_t>::iterator it,
void for_each_fresh_range(const match_t& range,
const std::vector<bool>& seen_bv,
const std::function<void(match_t)>& lambda) {
// walk range, breaking where we've seen it, emiting new ranges
// walk range, breaking where we've seen it, emitting new ranges
uint64_t p = range.start;
pos_t t = range.pos;
//std::cerr << "for_each_fresh_range " << range.start << "-" << range.end << " " << pos_to_string(range.pos) << std::endl;
Expand Down Expand Up @@ -257,17 +257,17 @@ void write_graph_chunk(const seqindex_t& seqidx,


size_t compute_transitive_closures(
const seqindex_t& seqidx,
mmmulti::iitree<uint64_t, pos_t>& aln_iitree, // input alignment matches between query seqs
const std::string& seq_v_file,
mmmulti::iitree<uint64_t, pos_t>& node_iitree, // maps graph seq ranges to input seq ranges
mmmulti::iitree<uint64_t, pos_t>& path_iitree, // maps input seq ranges to graph seq ranges
uint64_t repeat_max,
uint64_t min_repeat_dist,
uint64_t transclose_batch_size, // size of a batch to collect for lock-free transitive closure
bool show_progress,
uint64_t num_threads,
const std::chrono::time_point<std::chrono::steady_clock>& start_time) {
const seqindex_t& seqidx,
mmmulti::iitree<uint64_t, pos_t>& aln_iitree, // input alignment matches between query seqs
const std::string& seq_v_file,
mmmulti::iitree<uint64_t, pos_t>& node_iitree, // maps graph seq ranges to input seq ranges
mmmulti::iitree<uint64_t, pos_t>& path_iitree, // maps input seq ranges to graph seq ranges
uint64_t repeat_max,
uint64_t min_repeat_dist,
uint64_t transclose_batch_size, // size of a batch to collect for lock-free transitive closure
bool show_progress,
uint64_t num_threads,
const std::chrono::time_point<std::chrono::steady_clock>& start_time) {
// open the writers in the iitrees
node_iitree.open_writer();
path_iitree.open_writer();
Expand All @@ -294,16 +294,16 @@ size_t compute_transitive_closures(
while (i < input_seq_length && q_seen_bv[i]) ++i;
//std::cerr << "scanned_to\t" << i << std::endl;
if (i >= input_seq_length) break; // we're done!

// where our chunk begins
uint64_t chunk_start = i;
// extend until we've got chunk_size unseen bases
// extend until we've got chunk_size unseen bases (and where it ends (not past the end of the sequence))
uint64_t bases_to_consider = 0;
uint64_t chunk_end = chunk_start;
while (bases_to_consider < transclose_batch_size && chunk_end < input_seq_length) {
bases_to_consider += !q_seen_bv[chunk_end++];
}
// and where it ends (not past the end of the sequence)
//chunk_end = std::min(input_seq_length, chunk_end); // chunk_start + transclose_batch_size);

// collect ranges overlapping, per thread to avoid contention
// bits of sequence we've seen during this union-find chunk
atomicbitvector::atomic_bv_t q_curr_bv(seqidx.seq_length());
Expand Down Expand Up @@ -374,14 +374,13 @@ size_t compute_transitive_closures(
}
// manage the threads
uint64_t empty_iter_count = 0;
auto still_exploring
= [&explorings](void) {
bool ongoing = false;
for (auto& e : explorings) {
ongoing = ongoing || e.load();
}
return ongoing;
};
auto still_exploring = [&explorings]() {
bool ongoing = false;
for (auto& e : explorings) {
ongoing = ongoing || e.load();
}
return ongoing;
};
work_todo.store(true);
while (!todo_in.was_empty() || !todo.empty() || !todo_out.was_empty() || !ovlp_q.was_empty() || still_exploring() || ++empty_iter_count < 1000) {
std::this_thread::sleep_for(0.00001ns);
Expand Down Expand Up @@ -422,16 +421,64 @@ size_t compute_transitive_closures(
#endif
// run the transclosure for this region using lock-free union find
// convert the ranges into positions in the input sequence space
uint64_t q_curr_bv_count = 0;
//std::cerr << "q_subset_bv ";
for (auto x : q_curr_bv) {
++q_curr_bv_count;
// ... compute ranges
std::vector<std::pair<uint64_t, uint64_t>> ranges;
{
uint64_t begin = 0;
uint64_t end = q_curr_bv.size();
uint64_t chunk_size = end/num_threads;

std::pair<uint64_t, uint64_t> todo_range = std::make_pair(begin, std::min(begin + chunk_size, end));
uint64_t& todo_i = todo_range.first;
uint64_t& todo_j = todo_range.second;
while (todo_i != end) {
ranges.push_back(todo_range);
todo_i = std::min(todo_i + chunk_size, end);
todo_j = std::min(todo_j + chunk_size, end);
}
}

// ... compute how many set elements in each range and in total
std::vector<uint64_t> q_curr_bv_counts; q_curr_bv_counts.resize(ranges.size());
for (auto& x : q_curr_bv_counts) { x = 0; }
paryfor::parallel_for<uint64_t>(
0, ranges.size(), num_threads,
[&](uint64_t num_range, int tid) {
for (uint64_t ii = ranges[num_range].first; ii < ranges[num_range].second; ++ii) {
if (q_curr_bv[ii]) {
++q_curr_bv_counts[num_range];
}
}
});
uint64_t q_curr_bv_count = 0;
for(auto& value : q_curr_bv_counts) { q_curr_bv_count += value; }

// use a rank support to make a dense mapping from the current bases to an integer range
std::vector<uint64_t> q_curr_bv_vec; q_curr_bv_vec.reserve(q_curr_bv_count);
for (auto p : q_curr_bv) {
q_curr_bv_vec.push_back(p);
// ... pre-allocate default values
std::vector<uint64_t> q_curr_bv_vec; q_curr_bv_vec.resize(q_curr_bv_count);
// ... prepare offsets in the vector to fill: we already know the number of items that will come from each range.
{
// ...... range 0 elements will be inserted from the position 0
// ...... range 1 elements will be inserted from the position num_element_range_0
// ...... range N elements will be inserted from the position num_element_range_0+1+...+N-1
uint64_t current_starting_pos_for_range = q_curr_bv_count;
for(int64_t num_range = (int64_t)ranges.size() - 1; num_range >= 0; --num_range) {
current_starting_pos_for_range -= q_curr_bv_counts[num_range];
q_curr_bv_counts[num_range] = current_starting_pos_for_range;
}
}

// ... fill in parallel
paryfor::parallel_for<uint64_t>(
0, ranges.size(), num_threads,
[&](uint64_t num_range, int tid) {
for (uint64_t ii = ranges[num_range].first; ii < ranges[num_range].second; ++ii) {
if (q_curr_bv[ii]) {
q_curr_bv_vec[q_curr_bv_counts[num_range]++] = ii;
}
}
});

sdsl::bit_vector q_curr_bv_sdsl(seqidx.seq_length());
for (auto p : q_curr_bv_vec) {
q_curr_bv_sdsl[p] = 1;
Expand Down

0 comments on commit ccfefb0

Please sign in to comment.