From 8567bff935921c37dc038f186c9036ace5a41096 Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Fri, 4 Sep 2020 09:43:12 +0200
Subject: [PATCH 01/51] Intermediate
---
metagraph/src/cli/build.cpp | 15 +-
metagraph/src/common/elias_fano.cpp | 14 +-
metagraph/src/common/elias_fano.hpp | 2 +-
.../sorted_sets/sorted_set_disk_base.cpp | 2 +-
.../succinct/boss_chunk_construct.cpp | 353 ++++++++++++------
.../succinct/boss_chunk_construct.hpp | 6 +-
6 files changed, 258 insertions(+), 134 deletions(-)
diff --git a/metagraph/src/cli/build.cpp b/metagraph/src/cli/build.cpp
index 9e49828e42..2b131f40ac 100644
--- a/metagraph/src/cli/build.cpp
+++ b/metagraph/src/cli/build.cpp
@@ -11,6 +11,7 @@
#include "graph/representation/bitmap/dbg_bitmap_construct.hpp"
#include "graph/representation/succinct/dbg_succinct.hpp"
#include "graph/representation/succinct/boss_construct.hpp"
+#include "graph/representation/succinct/build_checkpoint.hpp"
#include "graph/graph_extensions/node_weights.hpp"
#include "config/config.hpp"
#include "parse_sequences.hpp"
@@ -105,6 +106,8 @@ int build_graph(Config *config) {
logger->info("k-mer suffix: '{}'", suffix);
}
+ boss::BuildCheckpoint checkpoint(config->tmp_dir);
+
auto constructor = boss::IBOSSChunkConstructor::initialize(
boss_graph->get_k(),
config->canonical,
@@ -115,10 +118,18 @@ int build_graph(Config *config) {
config->tmp_dir.empty() ? kmer::ContainerType::VECTOR
: kmer::ContainerType::VECTOR_DISK,
config->tmp_dir,
- config->disk_cap_bytes
+ config->disk_cap_bytes,
+ checkpoint
);
- push_sequences(files, *config, timer, constructor.get());
+ if (checkpoint.continuation_phase() == 0) {
+ push_sequences(files, *config, timer, constructor.get());
+ checkpoint.set_phase(1);
+ checkpoint.set_kmer_dir(constructor->tmp_dir());
+ checkpoint.store();
+ } else {
+ logger->info("Skipping parsing sequences from input file(s)");
+ }
boss::BOSS::Chunk *next_chunk = constructor->build_chunk();
logger->trace("Graph chunk with {} k-mers was built in {} sec",
diff --git a/metagraph/src/common/elias_fano.cpp b/metagraph/src/common/elias_fano.cpp
index 36213968fd..3fc1f46100 100644
--- a/metagraph/src/common/elias_fano.cpp
+++ b/metagraph/src/common/elias_fano.cpp
@@ -14,9 +14,10 @@
namespace mtg {
namespace common {
-void concat(const std::vector &files, const std::string &result) {
+std::vector concat(const std::vector &files, const std::string &result) {
if (files.empty())
- return;
+ return {};
+ std::vector original_files;
std::vector suffixes = { "", ".up" };
if (std::filesystem::exists(files[0] + ".count"))
@@ -24,17 +25,16 @@ void concat(const std::vector &files, const std::string &result) {
for (const std::string &suffix : suffixes) {
std::string concat_command = "cat ";
- for (uint32_t i = 1; i < files.size(); ++i) {
+ for (uint32_t i = 0; i < files.size(); ++i) {
concat_command += files[i] + suffix + " ";
}
- concat_command += " >> " + files[0] + suffix;
-
+ concat_command += " > " + result + suffix;
+ logger->trace("Executing '{}'", concat_command);
if (std::system(concat_command.c_str()))
throw std::runtime_error("Error while cat-ing files: " + concat_command);
- std::filesystem::rename(files[0] + suffix, result + suffix);
for (const std::string &f : files) {
- std::filesystem::remove(f + suffix);
+ original_files.push_back(f + suffix);
}
}
}
diff --git a/metagraph/src/common/elias_fano.hpp b/metagraph/src/common/elias_fano.hpp
index 8b35d2013c..cdee2bf45d 100644
--- a/metagraph/src/common/elias_fano.hpp
+++ b/metagraph/src/common/elias_fano.hpp
@@ -21,7 +21,7 @@ namespace common {
* The files store data that is ordered and the values in a file are smaller than the
* values in the next file.
*/
-void concat(const std::vector &files, const std::string &result);
+std::vector concat(const std::vector &files, const std::string &result);
/**
* Elias-Fano encoder that streams the encoded result into a file.
diff --git a/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp b/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
index 685ee2a5e7..49c8bc7092 100644
--- a/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
+++ b/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
@@ -91,7 +91,7 @@ void SortedSetDiskBase::start_merging_async() {
async_worker_.enqueue([file_names, this]() {
std::function on_new_item
= [this](const T &v) { merge_queue_.push(v); };
- merge_files(file_names, on_new_item);
+ merge_files(file_names, on_new_item, false);
merge_queue_.shutdown();
});
}
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
index aaf8196319..38f3c50d71 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
@@ -243,7 +243,8 @@ template
void recover_dummy_nodes(const KmerCollector &kmer_collector,
Vector &kmers,
ChunkedWaitQueue *kmers_out,
- ThreadPool &async_worker) {
+ ThreadPool &async_worker,
+ const BuildCheckpoint& ) {
using KMER = get_first_type_t;
using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $
using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer
@@ -398,7 +399,8 @@ std::tuple, std::vector, std::string>
generate_dummy_1_kmers(size_t k,
size_t num_threads,
const std::filesystem::path &dir,
- ChunkedWaitQueue &kmers) {
+ ChunkedWaitQueue &kmers,
+ BuildCheckpoint *checkpoint) {
using KMER = get_first_type_t; // 64/128/256-bit KmerExtractorBOSS::KmerBOSS
using KMER_INT = typename KMER::WordType; // KmerExtractorBOSS::KmerBOSS::WordType
@@ -407,6 +409,8 @@ generate_dummy_1_kmers(size_t k,
// for a DNA alphabet, this will contain 16 chunks, split by kmer[0] and kmer[1]
std::vector real_F_W = split(k, dir, kmers);
+ checkpoint->set_phase(3);
+ checkpoint->store();
const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size();
@@ -421,97 +425,115 @@ generate_dummy_1_kmers(size_t k,
dummy_sink_chunks.emplace_back(dummy_sink_names[i], ENCODER_BUFFER_SIZE);
}
- logger->trace("Generating dummy-1 source k-mers and dummy sink k-mers...");
- uint64_t num_sink = 0;
- uint64_t num_source = 0;
+ if (checkpoint->continuation_phase() < 3) {
+ logger->info("Generating dummy-1 source k-mers and dummy sink k-mers...");
+ uint64_t num_sink = 0;
+ uint64_t num_source = 0;
- static constexpr size_t L = KMER::kBitsPerChar;
- KMER_INT kmer_delta = kmer::get_sentinel_delta(L, k + 1);
- // reset kmer[1] (the first character in k-mer, $ in dummy source) to zero
- kmer_delta &= ~KMER_INT(((1ull << L) - 1) << L);
-
- #pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1)
- for (TAlphabet F = 0; F < alphabet_size; ++F) {
-
- // stream k-mers of pattern ***F*
- std::vector F_chunks(real_F_W.begin() + F * alphabet_size,
- real_F_W.begin() + (F + 1) * alphabet_size);
- common::MergeDecoder it(F_chunks, false);
-
- std::vector W_chunks; // chunks with k-mers of the form ****F
- for (TAlphabet c = 0; c < alphabet_size; ++c) {
- W_chunks.push_back(real_F_W[c * alphabet_size + F]);
- }
- common::ConcatDecoder sink_gen_it(W_chunks);
-
- while (!it.empty()) {
- KMER_REAL dummy_source(it.pop());
- // skip k-mers that would generate identical source dummy k-mers
- skip_same_suffix(dummy_source, it, 0);
- dummy_source.to_prev(k + 1, 0);
- // generate dummy sink k-mers from all non-dummy kmers smaller than |dummy_source|
- while (!sink_gen_it.empty()
- && sink_gen_it.top() < dummy_source.data()) {
+ static constexpr size_t L = KMER::kBitsPerChar;
+ KMER_INT kmer_delta = kmer::get_sentinel_delta(L, k + 1);
+ // reset kmer[1] (the first character in k-mer, $ in dummy source) to zero
+ kmer_delta &= ~KMER_INT(((1ull << L) - 1) << L);
+
+ #pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1)
+ for (TAlphabet F = 0; F < alphabet_size; ++F) {
+ // stream k-mers of pattern ***F*
+ std::vector F_chunks(real_F_W.begin() + F * alphabet_size,
+ real_F_W.begin() + (F + 1) * alphabet_size);
+ common::MergeDecoder it(F_chunks, false);
+
+ std::vector W_chunks; // chunks with k-mers of the form ****F
+ for (TAlphabet c = 0; c < alphabet_size; ++c) {
+ W_chunks.push_back(real_F_W[c * alphabet_size + F]);
+ }
+ common::ConcatDecoder sink_gen_it(W_chunks);
+
+ while (!it.empty()) {
+ KMER_REAL dummy_source(it.pop());
+ // skip k-mers that would generate identical source dummy k-mers
+ skip_same_suffix(dummy_source, it, 0);
+ dummy_source.to_prev(k + 1, 0);
+ // generate dummy sink k-mers from all non-dummy kmers smaller than |dummy_source|
+ while (!sink_gen_it.empty() && sink_gen_it.top() < dummy_source.data()) {
+ KMER_REAL v(sink_gen_it.pop());
+ // skip k-mers with the same suffix as v, as they generate identical
+ // dummy sink k-mers
+ skip_same_suffix(v, sink_gen_it, 1);
+ dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1));
+ num_sink++;
+ }
+ if (!sink_gen_it.empty()) {
+ KMER_REAL top(sink_gen_it.top());
+ if (KMER_REAL::compare_suffix(top, dummy_source, 1)) {
+ // The source dummy k-mer #dummy_source generated from #it is
+ // redundant iff it shares its suffix with another real k-mer (#top).
+ // In this case, #top generates a dummy sink k-mer redundant with
+ // #it. So if #dummy_source is redundant, the sink generated from
+ // #top is also redundant - so it's being skipped
+ skip_same_suffix(top, sink_gen_it, 1);
+ continue;
+ }
+ }
+ // lift all and reset the first character to the sentinel 0 (apply mask)
+ dummy_l1_chunks[F].add(kmer::transform(dummy_source, k + 1)
+ + kmer_delta);
+ num_source++;
+ }
+ // handle leftover sink_gen_it
+ while (!sink_gen_it.empty()) {
KMER_REAL v(sink_gen_it.pop());
- // skip k-mers with the same suffix as v, as they generate identical dummy
- // sink k-mers
skip_same_suffix(v, sink_gen_it, 1);
dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1));
num_sink++;
}
- if (!sink_gen_it.empty()) {
- KMER_REAL top(sink_gen_it.top());
- if (KMER_REAL::compare_suffix(top, dummy_source, 1)) {
- // The source dummy k-mer #dummy_source generated from #it is
- // redundant iff it shares its suffix with another real k-mer (#top).
- // In this case, #top generates a dummy sink k-mer redundant with #it.
- // So if #dummy_source is redundant, the sink generated from #top is
- // also redundant - so it's being skipped
- skip_same_suffix(top, sink_gen_it, 1);
- continue;
- }
- }
- // lift all and reset the first character to the sentinel 0 (apply mask)
- dummy_l1_chunks[F].add(kmer::transform(dummy_source, k + 1) + kmer_delta);
- num_source++;
}
- // handle leftover sink_gen_it
- while (!sink_gen_it.empty()) {
- KMER_REAL v(sink_gen_it.pop());
- skip_same_suffix(v, sink_gen_it, 1);
- dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1));
- num_sink++;
+
+ for (TAlphabet i = 0; i < alphabet_size; ++i) {
+ dummy_sink_chunks[i].finish();
+ dummy_l1_chunks[i].finish();
}
- }
- for (TAlphabet i = 0; i < alphabet_size; ++i) {
- dummy_sink_chunks[i].finish();
- dummy_l1_chunks[i].finish();
+ logger->trace("Generated {} dummy sink and {} dummy source k-mers", num_sink,
+ num_source);
+ checkpoint->set_phase(3);
+ checkpoint->store();
+ } else {
+ logger->info("Skipping generating dummy-1 source k-mers and dummy sink kmers");
}
- logger->trace("Generated {} dummy sink and {} dummy source k-mers",
- num_sink, num_source);
-
- // dummy sink k-mers are partitioned into blocks by F (kmer[1]), so simply
- // concatenating the blocks will result in a single ordered block
- logger->trace("Concatenating blocks of dummy sink k-mers ({} -> 1)...",
- dummy_sink_names.size());
- std::string dummy_sink_name = dir/"dummy_sink";
- common::concat(dummy_sink_names, dummy_sink_name);
-
- // similarly, the 16 blocks of the original k-mers can be concatenated in groups of
- // 4 without destroying the order
- logger->trace("Concatenating blocks of original real k-mers ({} -> {})...",
- real_F_W.size(), alphabet_size);
- std::vector real_split_by_W;
+ std::vector real_split_by_W(alphabet_size);
+ std::string dummy_sink_name = dir / "dummy_sink";
for (TAlphabet W = 0; W < alphabet_size; ++W) {
- std::vector blocks;
- for (TAlphabet F = 0; F < alphabet_size; ++F) {
- blocks.push_back(real_F_W[F * alphabet_size + W]);
+ real_split_by_W[W] = dir/("real_split_by_W_" + std::to_string(W));
+ }
+ if (checkpoint->continuation_phase() < 4) {
+ // dummy sink k-mers are partitioned into blocks by F (kmer[1]), so simply
+ // concatenating the blocks will result in a single ordered block
+ logger->trace("Concatenating blocks of dummy sink k-mers ({} -> 1)...",
+ dummy_sink_names.size());
+ std::vector to_delete
+ = common::concat(dummy_sink_names, dummy_sink_name);
+
+ // similarly, the 16 blocks of the original k-mers can be concatenated in
+ // groups of 4 without destroying the order
+ logger->trace("Concatenating blocks of original real k-mers ({} -> {})...",
+ real_F_W.size(), alphabet_size);
+ for (TAlphabet W = 0; W < alphabet_size; ++W) {
+ std::vector blocks;
+ for (TAlphabet F = 0; F < alphabet_size; ++F) {
+ blocks.push_back(real_F_W[F * alphabet_size + W]);
+ }
+ std::vector original
+ = common::concat(blocks, real_split_by_W[W]);
+ to_delete.insert(to_delete.end(), original.begin(), original.end());
}
- real_split_by_W.push_back(dir/("real_split_by_W_" + std::to_string(W)));
- common::concat(blocks, real_split_by_W.back());
+ for (const auto &name : to_delete) {
+ std::filesystem::remove(name);
+ }
+ checkpoint->set_phase(4);
+ checkpoint->store();
}
+
return { real_split_by_W, dummy_l1_names, dummy_sink_name };
}
@@ -552,47 +574,93 @@ void add_reverse_complements(size_t k,
size_t buffer_size,
const std::filesystem::path &dir,
ThreadPool& async_worker,
- ChunkedWaitQueue *kmers) {
+ ChunkedWaitQueue *kmers,
+ BuildCheckpoint *checkpoint) {
using T_INT_REAL = get_int_t; // either KMER_INT or
std::string rc_dir = dir/"rc";
std::filesystem::create_directory(rc_dir);
auto rc_set = std::make_unique>(
num_threads, buffer_size, rc_dir, std::numeric_limits::max());
- logger->trace("Adding reverse complements...");
+
common::EliasFanoEncoderBuffered original(dir/"original", ENCODER_BUFFER_SIZE);
Vector buffer;
buffer.reserve(10'000);
- for (auto &it = kmers->begin(); it != kmers->end(); ++it) {
- const T_REAL &kmer = *it;
- const T_REAL &reverse = rev_comp(k + 1, *it, KmerExtractor2Bit().complement_code());
- if (get_first(kmer) != get_first(reverse)) {
- buffer.push_back(reinterpret_cast(reverse));
- if (buffer.size() == buffer.capacity()) {
- rc_set->insert(buffer.begin(), buffer.end());
- buffer.resize(0);
- }
- original.add(reinterpret_cast(kmer));
- } else {
- if constexpr (utils::is_pair_v) {
- using C = typename T_REAL::second_type;
- if (kmer.second >> (sizeof(C) * 8 - 1)) {
- original.add({ kmer.first.data(), std::numeric_limits::max() });
- } else {
- original.add({ kmer.first.data(), 2 * kmer.second });
+ if (checkpoint->continuation_phase() < 2) {
+ logger->info("Adding reverse complements...");
+ for (auto &it = kmers->begin(); it != kmers->end(); ++it) {
+ const T_REAL &kmer = *it;
+ const T_REAL &reverse
+ = rev_comp(k + 1, *it, KmerExtractor2Bit().complement_code());
+ if (get_first(kmer) != get_first(reverse)) {
+ buffer.push_back(reinterpret_cast(reverse));
+ if (buffer.size() == buffer.capacity()) {
+ rc_set->insert(buffer.begin(), buffer.end());
+ buffer.resize(0);
}
- } else {
original.add(reinterpret_cast(kmer));
+ } else {
+ if constexpr (utils::is_pair_v) {
+ using C = typename T_REAL::second_type;
+ if (kmer.second >> (sizeof(C) * 8 - 1)) {
+ original.add({ kmer.first.data(), std::numeric_limits::max() });
+ } else {
+ original.add({ kmer.first.data(), 2 * kmer.second });
+ }
+ } else {
+ original.add(reinterpret_cast(kmer));
+ }
}
}
+ rc_set->insert(buffer.begin(), buffer.end());
+ original.finish();
+ checkpoint->set_phase(2);
+ checkpoint->store();
+ } else {
+ logger->info("Skipping adding reverse complements");
}
- rc_set->insert(buffer.begin(), buffer.end());
- original.finish();
+
+ if (checkpoint->continuation_phase() == 2) {
+ logger->info(
+ "Continuing from checkpoint phase 2. Looking for 'original' and "
+ "'rc/chunk_*' in {}",
+ checkpoint->kmer_dir());
+ if (!std::filesystem::exists(checkpoint->kmer_dir()/"original")) {
+ logger->error(
+ "Could not find {}. Recovery not possible. Remove {} to restart"
+ "the computation.",
+ checkpoint->kmer_dir()/"original");
+ }
+ std::vector file_names;
+ for (const auto &path : std::filesystem::directory_iterator(checkpoint->kmer_dir()/"rc")) {
+ if (path.is_regular_file()
+ && path.path().filename().string().find("chunk_", 0) == 0
+ && path.path().filename().extension() == "") {
+ logger->trace("Found chunk: {}", path.path().string());
+ file_names.push_back(path.path().string());
+ }
+ }
+ if (file_names.empty()) {
+ logger->error(
+ "Could not find chunk_* files in {}. Recovery not possible. "
+ "Remove temp dir to restart the computation from scratch.",
+ checkpoint->kmer_dir());
+ std::exit(1);
+ }
+ rc_set.reset();
+ async_worker.enqueue([kmers, file_names = std::move(file_names)]() {
+ std::function on_new_item
+ = [&kmers, &file_names](const T_INT_REAL &v) { rc_set.push(v); };
+ merge_files(file_names, on_new_item, false);
+ rc_set.shutdown();
+ });
+ }
+
// start merging #original with #reverse_complements into #kmers
kmers->reset();
async_worker.enqueue([rc_set = std::move(rc_set), &dir, kmers]() {
ChunkedWaitQueue &reverse_complements = rc_set->data(true);
- common::EliasFanoDecoder original_kmers(dir / "original");
+ common::EliasFanoDecoder original_kmers(dir/"original");
merge(original_kmers, reverse_complements, kmers);
});
}
@@ -612,7 +680,8 @@ template
void recover_dummy_nodes(const KmerCollector &kmer_collector,
ChunkedWaitQueue &kmers,
ChunkedWaitQueue *kmers_out,
- ThreadPool &async_worker) {
+ ThreadPool &async_worker,
+ BuildCheckpoint* checkpoint) {
using KMER_REAL = get_first_type_t; // 64/128/256-bit KmerBOSS
using T_INT_REAL = get_int_t; // either KMER_INT or
@@ -620,27 +689,57 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer
size_t k = kmer_collector.get_k() - 1;
- const std::filesystem::path dir = kmer_collector.tmp_dir();
+ const std::filesystem::path dir = checkpoint->continuation_phase() == 0
+ ? kmer_collector.tmp_dir()
+ : checkpoint->kmer_dir();
size_t num_threads = kmer_collector.num_threads();
+ if (checkpoint->continuation_phase() == 1) {
+ logger->info(
+ "Continuing from checkpoint phase 1. Looking for chunk_* files in {}",
+ checkpoint->kmer_dir());
+ std::vector file_names;
+ for (const auto &path : std::filesystem::directory_iterator(checkpoint->kmer_dir())) {
+ if (path.is_regular_file()
+ && path.path().filename().string().find("chunk_", 0) == 0
+ && path.path().filename().extension() == "") {
+ logger->trace("Found chunk: {}", path.path().string());
+ file_names.push_back(path.path().string());
+ }
+ }
+ if (file_names.empty()) {
+ logger->error(
+ "Could not find chunk_* files in {}. Recovery not possible. "
+ "Remove temp dir to restart the computation from scratch.",
+ checkpoint->kmer_dir());
+ std::exit(1);
+ }
+ kmers.reset();
+ async_worker.enqueue([kmers, file_names = std::move(file_names)]() {
+ std::function on_new_item
+ = [&kmers, &file_names](const T &v) { kmers.push(v); };
+ merge_files(file_names, on_new_item, false);
+ kmers.shutdown();
+ });
+ }
+
if (kmer_collector.is_both_strands_mode()) {
// compute the reverse complements of #kmers, then merge back into #kmers
add_reverse_complements(k, num_threads, kmer_collector.buffer_size(), dir,
- async_worker, &kmers);
+ async_worker, &kmers, checkpoint);
}
std::string dummy_sink_name;
std::vector real_split_by_W;
std::vector dummy_names;
std::tie(real_split_by_W, dummy_names, dummy_sink_name)
- = generate_dummy_1_kmers(k, num_threads, dir, kmers);
+ = generate_dummy_1_kmers(k, num_threads, dir, kmers, checkpoint);
- // stores the sorted original kmers and dummy-1 k-mers
+ // file names for the dummy_sink_0..3 and dummy_source_0..k_0..3 kmers
std::vector dummy_chunks = { dummy_sink_name };
// generate dummy k-mers of prefix length 1..k
logger->trace("Starting generating dummy-1..k source k-mers...");
for (size_t dummy_pref_len = 1; dummy_pref_len <= k; ++dummy_pref_len) {
- // this will compress all sorted dummy k-mers of given prefix length
for (const std::string &f : dummy_names) {
dummy_chunks.push_back(f);
}
@@ -677,8 +776,11 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
const std::function on_merge = [](const KMER_INT &) {};
common::merge_files(dummy_names, on_merge);
- // at this point, we have the original k-mers and dummy-1 k-mers in original_and_dummy_l1,
- // the dummy-x k-mers in dummy_source_{x}, and we merge them all into a single stream
+ checkpoint->set_phase(6);
+ checkpoint->store();
+
+ // at this point, we have the original k-mers in real_split_by_W, the dummy-x k-mers
+ // in dummy_chunks, and we merge them all into a single stream
kmers_out->reset();
// add the main dummy source k-mer
@@ -703,7 +805,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
return kmer::transform(reinterpret_cast(v), k + 1) + kmer_delta;
}
},
- real_split_by_W, true
+ real_split_by_W, false /* remove sources */
);
common::Transformed, T> decoder_dummy(
@@ -714,7 +816,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
return reinterpret_cast(v);
}
},
- dummy_chunks, true
+ dummy_chunks, false /* remove sources */
);
while (!decoder.empty() && !decoder_dummy.empty()) {
@@ -770,7 +872,8 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
size_t num_threads,
double memory_preallocated,
const std::filesystem::path &tmp_dir,
- size_t max_disk_space)
+ size_t max_disk_space,
+ const BuildCheckpoint& checkpoint)
: kmer_collector_(k + 1,
both_strands_mode,
encode_filter_suffix_boss(filter_suffix),
@@ -779,26 +882,30 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
tmp_dir,
max_disk_space,
both_strands_mode && filter_suffix.empty() /* keep only canonical k-mers */),
- bits_per_count_(bits_per_count) {
- if (filter_suffix.size()
+ bits_per_count_(bits_per_count), checkpoint_(checkpoint) {
+ if (checkpoint.phase() == 0 && filter_suffix.size()
&& filter_suffix == std::string(filter_suffix.size(), BOSS::kSentinel)) {
kmer_collector_.add_kmer(std::vector(k + 1, BOSS::kSentinelCode));
}
}
- void add_sequence(std::string_view sequence, uint64_t count) {
+ void add_sequence(std::string_view sequence, uint64_t count) override {
kmer_collector_.add_sequence(sequence, count);
}
- void add_sequences(std::vector&& sequences) {
+ void add_sequences(std::vector&& sequences) override {
kmer_collector_.add_sequences(std::move(sequences));
}
- void add_sequences(std::vector>&& sequences) {
+ void add_sequences(std::vector>&& sequences) override {
kmer_collector_.add_sequences(std::move(sequences));
}
- BOSS::Chunk* build_chunk() {
+ std::filesystem::path tmp_dir() const override {
+ return kmer_collector_.tmp_dir();
+ }
+
+ BOSS::Chunk* build_chunk() override {
BOSS::Chunk *result;
typename KmerCollector::Data &kmer_ints = kmer_collector_.data();
@@ -849,13 +956,14 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
return result;
}
- uint64_t get_k() const { return kmer_collector_.get_k() - 1; }
+ uint64_t get_k() const override { return kmer_collector_.get_k() - 1; }
private:
KmerCollector kmer_collector_;
uint8_t bits_per_count_;
/** Used as an async executor for merging chunks from disk */
ThreadPool async_worker_ = ThreadPool(1, 1);
+ BuildCheckpoint checkpoint_;
};
template class KmerContainer, typename... Args>
@@ -950,9 +1058,10 @@ IBOSSChunkConstructor::initialize(size_t k,
double memory_preallocated,
kmer::ContainerType container_type,
const std::filesystem::path &tmp_dir,
- size_t max_disk_space_bytes) {
+ size_t max_disk_space_bytes,
+ const BuildCheckpoint& checkpoint) {
#define OTHER_ARGS k, canonical_mode, bits_per_count, filter_suffix, \
- num_threads, memory_preallocated, tmp_dir, max_disk_space_bytes
+ num_threads, memory_preallocated, tmp_dir, max_disk_space_bytes, checkpoint
switch (container_type) {
case kmer::ContainerType::VECTOR:
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp
index fa40a71cce..0c22678a2a 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp
@@ -9,6 +9,7 @@
#include "kmer/kmer_collector_config.hpp"
#include "graph/representation/base/dbg_construct.hpp"
#include "boss_chunk.hpp"
+#include "build_checkpoint.hpp"
namespace mtg {
@@ -28,9 +29,12 @@ class IBOSSChunkConstructor : public IGraphChunkConstructor {
double memory_preallocated = 0,
mtg::kmer::ContainerType container_type = mtg::kmer::ContainerType::VECTOR,
const std::filesystem::path &swap_dir = "/tmp/",
- size_t max_disk_space_bytes = 1e9);
+ size_t max_disk_space_bytes = 1e9,
+ const BuildCheckpoint& checkpoint = BuildCheckpoint("/tmp"));
virtual uint64_t get_k() const = 0;
+
+ virtual std::filesystem::path tmp_dir() const = 0;
};
} // namespace boss
From 97294cdec9d753f753537beaab8a96c3d4102ca8 Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Sat, 5 Sep 2020 14:02:32 +0200
Subject: [PATCH 02/51] First attempt
---
metagraph/src/cli/build.cpp | 5 +-
metagraph/src/common/elias_fano.cpp | 1 +
.../succinct/boss_chunk_construct.cpp | 488 +++++++++---------
3 files changed, 253 insertions(+), 241 deletions(-)
diff --git a/metagraph/src/cli/build.cpp b/metagraph/src/cli/build.cpp
index 2b131f40ac..448ed61f8d 100644
--- a/metagraph/src/cli/build.cpp
+++ b/metagraph/src/cli/build.cpp
@@ -122,11 +122,8 @@ int build_graph(Config *config) {
checkpoint
);
- if (checkpoint.continuation_phase() == 0) {
+ if (checkpoint.phase() == 0) {
push_sequences(files, *config, timer, constructor.get());
- checkpoint.set_phase(1);
- checkpoint.set_kmer_dir(constructor->tmp_dir());
- checkpoint.store();
} else {
logger->info("Skipping parsing sequences from input file(s)");
}
diff --git a/metagraph/src/common/elias_fano.cpp b/metagraph/src/common/elias_fano.cpp
index 3fc1f46100..6a3165465c 100644
--- a/metagraph/src/common/elias_fano.cpp
+++ b/metagraph/src/common/elias_fano.cpp
@@ -37,6 +37,7 @@ std::vector concat(const std::vector &files, const std
original_files.push_back(f + suffix);
}
}
+ return original_files;
}
template
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
index 38f3c50d71..c0d7b4110f 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
@@ -244,7 +244,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
Vector &kmers,
ChunkedWaitQueue *kmers_out,
ThreadPool &async_worker,
- const BuildCheckpoint& ) {
+ BuildCheckpoint* ) {
using KMER = get_first_type_t;
using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $
using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer
@@ -348,15 +348,14 @@ using Decoder = common::EliasFanoDecoder;
template
std::vector split(size_t k,
const std::filesystem::path &dir,
- const ChunkedWaitQueue &kmers) {
+ const ChunkedWaitQueue &kmers,
+ BuildCheckpoint* checkpoint) {
using T_INT_REAL = get_int_t;
const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size();
size_t chunk_count = std::pow(alphabet_size, 2);
- logger->trace("Splitting k-mers into {} chunks...", chunk_count);
-
std::vector> sinks;
std::vector names(chunk_count);
for (size_t i = 0; i < names.size(); ++i) {
@@ -364,6 +363,12 @@ std::vector split(size_t k,
sinks.emplace_back(names[i], ENCODER_BUFFER_SIZE);
}
+ if (checkpoint->phase() > 2) {
+ logger->info("Skipping splitting k-mers into chunks");
+ return names;
+ }
+
+ logger->info("Splitting k-mers into {} chunks...", chunk_count);
size_t num_kmers = 0;
for (auto &it = kmers.begin(); it != kmers.end(); ++it) {
const T_REAL &kmer = *it;
@@ -375,6 +380,10 @@ std::vector split(size_t k,
}
std::for_each(sinks.begin(), sinks.end(), [](auto &f) { f.finish(); });
logger->trace("Total number of real k-mers: {}", num_kmers);
+
+ checkpoint->set_phase(3);
+ checkpoint->store();
+
return names;
}
@@ -389,13 +398,60 @@ void skip_same_suffix(const KMER &el, Decoder &decoder, size_t suf) {
}
}
+std::pair, std::string>
+concatenate_chunks(const std::filesystem::path &dir,
+ const std::vector &dummy_sink_names,
+ const std::vector &real_F_W,
+ BuildCheckpoint *checkpoint) {
+ const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size();
+
+ std::vector real_split_by_W(alphabet_size);
+ std::string dummy_sink_name = dir / "dummy_sink";
+ for (TAlphabet W = 0; W < alphabet_size; ++W) {
+ real_split_by_W[W] = dir/("real_split_by_W_" + std::to_string(W));
+ }
+
+ if (checkpoint->phase() > 4) {
+ return { real_split_by_W, dummy_sink_name };
+ }
+
+ // dummy sink k-mers are partitioned into blocks by F (kmer[1]), so simply
+ // concatenating the blocks will result in a single ordered block
+ logger->trace("Concatenating blocks of dummy sink k-mers ({} -> 1)...",
+ dummy_sink_names.size());
+ std::vector to_delete
+ = common::concat(dummy_sink_names, dummy_sink_name);
+
+ // similarly, the 16 blocks of the original k-mers can be concatenated in
+ // groups of 4 without destroying the order
+ logger->trace("Concatenating blocks of original real k-mers ({} -> {})...",
+ real_F_W.size(), alphabet_size);
+ for (TAlphabet W = 0; W < alphabet_size; ++W) {
+ std::vector blocks;
+ for (TAlphabet F = 0; F < alphabet_size; ++F) {
+ blocks.push_back(real_F_W[F * alphabet_size + W]);
+ }
+ std::vector original
+ = common::concat(blocks, real_split_by_W[W]);
+ to_delete.insert(to_delete.end(), original.begin(), original.end());
+ }
+
+ for (const auto &name : to_delete) {
+ std::filesystem::remove(name);
+ }
+
+ checkpoint->set_phase(5);
+ checkpoint->store();
+ return { real_split_by_W, dummy_sink_name };
+}
+
/**
* Generates non-redundant dummy-1 source k-mers and dummy sink kmers from #kmers.
* @return a triplet containing the names of the original k-mer blocks, the dummy-1 source
* k-mer blocks and the dummy sink k-mers
*/
template
-std::tuple, std::vector, std::string>
+std::pair, std::vector>
generate_dummy_1_kmers(size_t k,
size_t num_threads,
const std::filesystem::path &dir,
@@ -408,9 +464,7 @@ generate_dummy_1_kmers(size_t k,
using KMER_INT_REAL = typename KMER_REAL::WordType; // KmerExtractorT::KmerBOSS::WordType
// for a DNA alphabet, this will contain 16 chunks, split by kmer[0] and kmer[1]
- std::vector real_F_W = split(k, dir, kmers);
- checkpoint->set_phase(3);
- checkpoint->store();
+ std::vector real_F_W = split(k, dir, kmers, checkpoint);
const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size();
@@ -425,144 +479,84 @@ generate_dummy_1_kmers(size_t k,
dummy_sink_chunks.emplace_back(dummy_sink_names[i], ENCODER_BUFFER_SIZE);
}
- if (checkpoint->continuation_phase() < 3) {
- logger->info("Generating dummy-1 source k-mers and dummy sink k-mers...");
- uint64_t num_sink = 0;
- uint64_t num_source = 0;
+ if (checkpoint->phase() > 3) {
+ logger->info("Skipping generating dummy-1 source k-mers and dummy sink kmers");
+ return { dummy_sink_names, real_F_W };
+ }
- static constexpr size_t L = KMER::kBitsPerChar;
- KMER_INT kmer_delta = kmer::get_sentinel_delta(L, k + 1);
- // reset kmer[1] (the first character in k-mer, $ in dummy source) to zero
- kmer_delta &= ~KMER_INT(((1ull << L) - 1) << L);
+ logger->info("Generating dummy-1 source k-mers and dummy sink k-mers...");
+ uint64_t num_sink = 0;
+ uint64_t num_source = 0;
- #pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1)
- for (TAlphabet F = 0; F < alphabet_size; ++F) {
- // stream k-mers of pattern ***F*
- std::vector F_chunks(real_F_W.begin() + F * alphabet_size,
- real_F_W.begin() + (F + 1) * alphabet_size);
- common::MergeDecoder it(F_chunks, false);
-
- std::vector W_chunks; // chunks with k-mers of the form ****F
- for (TAlphabet c = 0; c < alphabet_size; ++c) {
- W_chunks.push_back(real_F_W[c * alphabet_size + F]);
- }
- common::ConcatDecoder sink_gen_it(W_chunks);
-
- while (!it.empty()) {
- KMER_REAL dummy_source(it.pop());
- // skip k-mers that would generate identical source dummy k-mers
- skip_same_suffix(dummy_source, it, 0);
- dummy_source.to_prev(k + 1, 0);
- // generate dummy sink k-mers from all non-dummy kmers smaller than |dummy_source|
- while (!sink_gen_it.empty() && sink_gen_it.top() < dummy_source.data()) {
- KMER_REAL v(sink_gen_it.pop());
- // skip k-mers with the same suffix as v, as they generate identical
- // dummy sink k-mers
- skip_same_suffix(v, sink_gen_it, 1);
- dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1));
- num_sink++;
- }
- if (!sink_gen_it.empty()) {
- KMER_REAL top(sink_gen_it.top());
- if (KMER_REAL::compare_suffix(top, dummy_source, 1)) {
- // The source dummy k-mer #dummy_source generated from #it is
- // redundant iff it shares its suffix with another real k-mer (#top).
- // In this case, #top generates a dummy sink k-mer redundant with
- // #it. So if #dummy_source is redundant, the sink generated from
- // #top is also redundant - so it's being skipped
- skip_same_suffix(top, sink_gen_it, 1);
- continue;
- }
- }
- // lift all and reset the first character to the sentinel 0 (apply mask)
- dummy_l1_chunks[F].add(kmer::transform(dummy_source, k + 1)
- + kmer_delta);
- num_source++;
- }
- // handle leftover sink_gen_it
- while (!sink_gen_it.empty()) {
+ static constexpr size_t L = KMER::kBitsPerChar;
+ KMER_INT kmer_delta = kmer::get_sentinel_delta(L, k + 1);
+ // reset kmer[1] (the first character in k-mer, $ in dummy source) to zero
+ kmer_delta &= ~KMER_INT(((1ull << L) - 1) << L);
+
+ #pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1)
+ for (TAlphabet F = 0; F < alphabet_size; ++F) {
+ // stream k-mers of pattern ***F*
+ std::vector F_chunks(real_F_W.begin() + F * alphabet_size,
+ real_F_W.begin() + (F + 1) * alphabet_size);
+ common::MergeDecoder it(F_chunks, false);
+
+ std::vector W_chunks; // chunks with k-mers of the form ****F
+ for (TAlphabet c = 0; c < alphabet_size; ++c) {
+ W_chunks.push_back(real_F_W[c * alphabet_size + F]);
+ }
+ common::ConcatDecoder sink_gen_it(W_chunks);
+
+ while (!it.empty()) {
+ KMER_REAL dummy_source(it.pop());
+ // skip k-mers that would generate identical source dummy k-mers
+ skip_same_suffix(dummy_source, it, 0);
+ dummy_source.to_prev(k + 1, 0);
+ // generate dummy sink k-mers from all non-dummy kmers smaller than |dummy_source|
+ while (!sink_gen_it.empty() && sink_gen_it.top() < dummy_source.data()) {
KMER_REAL v(sink_gen_it.pop());
+ // skip k-mers with the same suffix as v, as they generate identical
+ // dummy sink k-mers
skip_same_suffix(v, sink_gen_it, 1);
dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1));
num_sink++;
}
+ if (!sink_gen_it.empty()) {
+ KMER_REAL top(sink_gen_it.top());
+ if (KMER_REAL::compare_suffix(top, dummy_source, 1)) {
+ // The source dummy k-mer #dummy_source generated from #it is
+ // redundant iff it shares its suffix with another real k-mer (#top).
+ // In this case, #top generates a dummy sink k-mer redundant with
+ // #it. So if #dummy_source is redundant, the sink generated from
+ // #top is also redundant - so it's being skipped
+ skip_same_suffix(top, sink_gen_it, 1);
+ continue;
+ }
+ }
+ // lift all and reset the first character to the sentinel 0 (apply mask)
+ dummy_l1_chunks[F].add(kmer::transform(dummy_source, k + 1)
+ + kmer_delta);
+ num_source++;
}
-
- for (TAlphabet i = 0; i < alphabet_size; ++i) {
- dummy_sink_chunks[i].finish();
- dummy_l1_chunks[i].finish();
+ // handle leftover sink_gen_it
+ while (!sink_gen_it.empty()) {
+ KMER_REAL v(sink_gen_it.pop());
+ skip_same_suffix(v, sink_gen_it, 1);
+ dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1));
+ num_sink++;
}
-
- logger->trace("Generated {} dummy sink and {} dummy source k-mers", num_sink,
- num_source);
- checkpoint->set_phase(3);
- checkpoint->store();
- } else {
- logger->info("Skipping generating dummy-1 source k-mers and dummy sink kmers");
}
- std::vector real_split_by_W(alphabet_size);
- std::string dummy_sink_name = dir / "dummy_sink";
- for (TAlphabet W = 0; W < alphabet_size; ++W) {
- real_split_by_W[W] = dir/("real_split_by_W_" + std::to_string(W));
- }
- if (checkpoint->continuation_phase() < 4) {
- // dummy sink k-mers are partitioned into blocks by F (kmer[1]), so simply
- // concatenating the blocks will result in a single ordered block
- logger->trace("Concatenating blocks of dummy sink k-mers ({} -> 1)...",
- dummy_sink_names.size());
- std::vector to_delete
- = common::concat(dummy_sink_names, dummy_sink_name);
-
- // similarly, the 16 blocks of the original k-mers can be concatenated in
- // groups of 4 without destroying the order
- logger->trace("Concatenating blocks of original real k-mers ({} -> {})...",
- real_F_W.size(), alphabet_size);
- for (TAlphabet W = 0; W < alphabet_size; ++W) {
- std::vector blocks;
- for (TAlphabet F = 0; F < alphabet_size; ++F) {
- blocks.push_back(real_F_W[F * alphabet_size + W]);
- }
- std::vector original
- = common::concat(blocks, real_split_by_W[W]);
- to_delete.insert(to_delete.end(), original.begin(), original.end());
- }
- for (const auto &name : to_delete) {
- std::filesystem::remove(name);
- }
- checkpoint->set_phase(4);
- checkpoint->store();
+ for (TAlphabet i = 0; i < alphabet_size; ++i) {
+ dummy_sink_chunks[i].finish();
+ dummy_l1_chunks[i].finish();
}
- return { real_split_by_W, dummy_l1_names, dummy_sink_name };
-}
+ logger->trace("Generated {} dummy sink and {} dummy source k-mers", num_sink,
+ num_source);
+ checkpoint->set_phase(4);
+ checkpoint->store();
-/** Merges #original_kmers with #reverse_complements and places the result into #kmers */
-template
-static void merge(common::EliasFanoDecoder &original_kmers,
- ChunkedWaitQueue &reverse_complements,
- ChunkedWaitQueue *kmers) {
- auto &kmers_int = reinterpret_cast &>(*kmers);
- auto &it = reverse_complements.begin();
- std::optional orig = original_kmers.next();
- while (it != reverse_complements.end() && orig.has_value()) {
- if (get_first(orig.value()) < get_first(*it)) {
- kmers_int.push(orig.value());
- orig = original_kmers.next();
- } else {
- kmers_int.push(*it);
- ++it;
- }
- }
- while (it != reverse_complements.end()) {
- kmers_int.push(*it);
- ++it;
- }
- while (orig.has_value()) {
- kmers_int.push(orig.value());
- orig = original_kmers.next();
- }
- kmers->shutdown();
+ return { dummy_sink_names, real_F_W };
}
/**
@@ -576,17 +570,49 @@ void add_reverse_complements(size_t k,
ThreadPool& async_worker,
ChunkedWaitQueue *kmers,
BuildCheckpoint *checkpoint) {
+ if (checkpoint->phase() > 2) {
+ logger->info("Skipping generating reverse complements");
+ return;
+ }
using T_INT_REAL = get_int_t; // either KMER_INT or
- std::string rc_dir = dir/"rc";
- std::filesystem::create_directory(rc_dir);
- auto rc_set = std::make_unique>(
- num_threads, buffer_size, rc_dir, std::numeric_limits::max());
-
- common::EliasFanoEncoderBuffered original(dir/"original", ENCODER_BUFFER_SIZE);
- Vector buffer;
- buffer.reserve(10'000);
- if (checkpoint->continuation_phase() < 2) {
+ std::vector to_merge = { dir/"original" };
+ if (checkpoint->phase() == 2) {
+ logger->info(
+ "Continuing from checkpoint phase 2. Looking for 'original' and "
+ "'rc/chunk_*' in {}",
+ checkpoint->kmer_dir());
+ if (!std::filesystem::exists(checkpoint->kmer_dir()/"original")) {
+ logger->error(
+ "Could not find {}. Recovery not possible. Remove tmp dir to "
+ "restart the computation.",
+ checkpoint->kmer_dir()/"original");
+ std::exit(1);
+ }
+ for (const auto &path : std::filesystem::directory_iterator(checkpoint->kmer_dir()/"rc")) {
+ if (path.is_regular_file()
+ && path.path().filename().string().find("chunk_", 0) == 0
+ && path.path().filename().extension() == "") {
+ logger->trace("Found chunk: {}", path.path().string());
+ to_merge.push_back(path.path().string());
+ }
+ }
+ if (to_merge.size() == 1) {
+ logger->error(
+ "Could not find chunk_* files in {}. Recovery not possible. "
+ "Remove temp dir to restart the computation from scratch.",
+ checkpoint->kmer_dir());
+ std::exit(1);
+ }
+ } else { // checkpoint->phase() < 2
+ std::string rc_dir = dir/"rc";
+ std::filesystem::create_directory(rc_dir);
+ auto rc_set = std::make_unique>(
+ num_threads, buffer_size, rc_dir, std::numeric_limits::max());
+
+ common::EliasFanoEncoderBuffered original(dir/"original", ENCODER_BUFFER_SIZE);
+ Vector buffer;
+ buffer.reserve(10'000);
logger->info("Adding reverse complements...");
for (auto &it = kmers->begin(); it != kmers->end(); ++it) {
const T_REAL &kmer = *it;
@@ -613,55 +639,24 @@ void add_reverse_complements(size_t k,
}
}
rc_set->insert(buffer.begin(), buffer.end());
+ std::vector to_insert = rc_set->files_to_merge();
+ to_merge.insert(to_merge.end(), to_insert.begin(), to_insert.end());
original.finish();
checkpoint->set_phase(2);
checkpoint->store();
- } else {
- logger->info("Skipping adding reverse complements");
- }
-
- if (checkpoint->continuation_phase() == 2) {
- logger->info(
- "Continuing from checkpoint phase 2. Looking for 'original' and "
- "'rc/chunk_*' in {}",
- checkpoint->kmer_dir());
- if (!std::filesystem::exists(checkpoint->kmer_dir()/"original")) {
- logger->error(
- "Could not find {}. Recovery not possible. Remove {} to restart"
- "the computation.",
- checkpoint->kmer_dir()/"original");
- }
- std::vector file_names;
- for (const auto &path : std::filesystem::directory_iterator(checkpoint->kmer_dir()/"rc")) {
- if (path.is_regular_file()
- && path.path().filename().string().find("chunk_", 0) == 0
- && path.path().filename().extension() == "") {
- logger->trace("Found chunk: {}", path.path().string());
- file_names.push_back(path.path().string());
- }
- }
- if (file_names.empty()) {
- logger->error(
- "Could not find chunk_* files in {}. Recovery not possible. "
- "Remove temp dir to restart the computation from scratch.",
- checkpoint->kmer_dir());
- std::exit(1);
- }
- rc_set.reset();
- async_worker.enqueue([kmers, file_names = std::move(file_names)]() {
- std::function on_new_item
- = [&kmers, &file_names](const T_INT_REAL &v) { rc_set.push(v); };
- merge_files(file_names, on_new_item, false);
- rc_set.shutdown();
- });
}
// start merging #original with #reverse_complements into #kmers
kmers->reset();
- async_worker.enqueue([rc_set = std::move(rc_set), &dir, kmers]() {
- ChunkedWaitQueue &reverse_complements = rc_set->data(true);
- common::EliasFanoDecoder original_kmers(dir/"original");
- merge(original_kmers, reverse_complements, kmers);
+ async_worker.enqueue([rc_files = std::move(to_merge), kmers]() {
+ common::MergeDecoder chunked_kmers(rc_files, false);
+
+ auto &kmers_int = reinterpret_cast &>(*kmers);
+ std::optional kmer;
+ while ((kmer = chunked_kmers.pop()).has_value()) {
+ kmers_int.push(kmer.value());
+ }
+ kmers->shutdown();
});
}
@@ -688,13 +683,18 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $
using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer
+ uint32_t previous_phase = checkpoint->phase();
+ if (checkpoint->phase() == 0) {
+ checkpoint->set_kmer_dir(kmer_collector.tmp_dir());
+ checkpoint->set_phase(1);
+ checkpoint->store();
+ }
+
size_t k = kmer_collector.get_k() - 1;
- const std::filesystem::path dir = checkpoint->continuation_phase() == 0
- ? kmer_collector.tmp_dir()
- : checkpoint->kmer_dir();
+ const std::filesystem::path dir = checkpoint->kmer_dir();
size_t num_threads = kmer_collector.num_threads();
- if (checkpoint->continuation_phase() == 1) {
+ if (previous_phase == 1) {
logger->info(
"Continuing from checkpoint phase 1. Looking for chunk_* files in {}",
checkpoint->kmer_dir());
@@ -715,11 +715,14 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
std::exit(1);
}
kmers.reset();
- async_worker.enqueue([kmers, file_names = std::move(file_names)]() {
- std::function on_new_item
- = [&kmers, &file_names](const T &v) { kmers.push(v); };
- merge_files(file_names, on_new_item, false);
+ async_worker.enqueue([&kmers, file_names = std::move(file_names)]() {
+ auto &kmers_int = reinterpret_cast &>(kmers);
+ std::function on_new_item
+ = [&kmers_int](const T_INT_REAL &v) { kmers_int.push(v); };
+ common::merge_files(file_names, on_new_item, false);
kmers.shutdown();
+ std::for_each(file_names.begin(), file_names.end(),
+ [](const auto &f) { std::filesystem::remove(f); });
});
}
@@ -729,55 +732,66 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
async_worker, &kmers, checkpoint);
}
- std::string dummy_sink_name;
- std::vector real_split_by_W;
- std::vector dummy_names;
- std::tie(real_split_by_W, dummy_names, dummy_sink_name)
+ auto [dummy_sink_names, real_F_W]
= generate_dummy_1_kmers(k, num_threads, dir, kmers, checkpoint);
- // file names for the dummy_sink_0..3 and dummy_source_0..k_0..3 kmers
- std::vector dummy_chunks = { dummy_sink_name };
- // generate dummy k-mers of prefix length 1..k
- logger->trace("Starting generating dummy-1..k source k-mers...");
- for (size_t dummy_pref_len = 1; dummy_pref_len <= k; ++dummy_pref_len) {
- for (const std::string &f : dummy_names) {
- dummy_chunks.push_back(f);
- }
+ std::vector real_split_by_W;
+ std::string dummy_sink_name;
+ std::tie(real_split_by_W, dummy_sink_name)
+ = concatenate_chunks(dir, dummy_sink_names, real_F_W, checkpoint);
- const uint8_t alphabet_size = KmerExtractorBOSS::alphabet.size();
- std::vector dummy_next_names(alphabet_size);
- std::vector> dummy_next_chunks;
+ // file names for the dummy_sink and dummy_source_1..k_0..3 kmers
+ std::vector dummy_chunk_names;
+ const uint8_t alphabet_size = KmerExtractorBOSS::alphabet.size();
+ for (size_t dummy_pref_len = 1; dummy_pref_len <= k; ++dummy_pref_len) {
for (TAlphabet i = 0; i < alphabet_size; ++i) {
- dummy_next_names[i] = dir/("dummy_source_"
- + std::to_string(dummy_pref_len + 1) + "_" + std::to_string(i));
- dummy_next_chunks.emplace_back(dummy_next_names[i], ENCODER_BUFFER_SIZE);
+ std::string suffix
+ = std::to_string(dummy_pref_len + 1) + "_" + std::to_string(i);
+ dummy_chunk_names.push_back(dir/("dummy_source_" + suffix));
}
+ }
+ dummy_chunk_names.push_back(dummy_sink_name);
- KMER prev_kmer(0);
- uint64_t num_kmers = 0;
- const std::function &write_dummy = [&](const KMER_INT &v) {
- KMER kmer(v);
- kmer.to_prev(k + 1, BOSS::kSentinelCode);
- if (prev_kmer != kmer) {
- dummy_next_chunks[kmer[0]].add(kmer.data());
- prev_kmer = std::move(kmer);
+ if (checkpoint->phase() < 6) {
+ // generate dummy k-mers of prefix length 1..k
+ logger->trace("Starting generating dummy-1..{} source k-mers...", k);
+ for (size_t dummy_pref_len = 2; dummy_pref_len <= k; ++dummy_pref_len) {
+
+ std::vector> next_chunks;
+ for (TAlphabet i = 0; i < alphabet_size; ++i) {
+ next_chunks.emplace_back(dummy_chunk_names[dummy_pref_len * alphabet_size + i],
+ ENCODER_BUFFER_SIZE);
}
- num_kmers++;
- };
- common::merge_files(dummy_names, write_dummy, false);
-
- std::for_each(dummy_next_chunks.begin(), dummy_next_chunks.end(),
- [](auto &v) { v.finish(); });
- dummy_names = std::move(dummy_next_names);
- logger->trace("Number of dummy k-mers with dummy prefix of length {}: {}",
- dummy_pref_len, num_kmers);
- }
- // remove the last chunks with .up and .count
- const std::function on_merge = [](const KMER_INT &) {};
- common::merge_files(dummy_names, on_merge);
- checkpoint->set_phase(6);
- checkpoint->store();
+ // the chunks containing (dummy_pref_len-1) dummy k-mers
+ auto begin = dummy_chunk_names.begin() + (dummy_pref_len - 1) * alphabet_size;
+ std::vector current_names(begin, begin + alphabet_size);
+
+ KMER prev_kmer(0);
+ uint64_t num_kmers = 0;
+ const std::function &write_dummy
+ = [&](const KMER_INT &v) {
+ KMER kmer(v);
+ kmer.to_prev(k + 1, BOSS::kSentinelCode);
+ if (prev_kmer != kmer) {
+ next_chunks[kmer[0]].add(kmer.data());
+ prev_kmer = std::move(kmer);
+ }
+ num_kmers++;
+ };
+ common::merge_files(current_names, write_dummy, false);
+
+ std::for_each(next_chunks.begin(), next_chunks.end(),
+ [](auto &v) { v.finish(); });
+ logger->trace("Number of dummy k-mers with dummy prefix of length {}: {}",
+ dummy_pref_len - 1, num_kmers);
+ }
+
+ checkpoint->set_phase(6);
+ checkpoint->store();
+ } else {
+ logger->info("Skipping generating dummy-1..{} source k-mers", k);
+ }
// at this point, we have the original k-mers in real_split_by_W, the dummy-x k-mers
// in dummy_chunks, and we merge them all into a single stream
@@ -794,7 +808,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
= kmer::get_sentinel_delta(KMER::kBitsPerChar, k + 1);
// push all other dummy and non-dummy k-mers to |kmers_out|
- async_worker.enqueue([k, kmer_delta, kmers_out, real_split_by_W, dummy_chunks]() {
+ async_worker.enqueue([k, kmer_delta, kmers_out, real_split_by_W, dummy_chunk_names]() {
common::Transformed, T> decoder(
[&](const T_INT_REAL &v) {
if constexpr (utils::is_pair_v) {
@@ -816,7 +830,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
return reinterpret_cast(v);
}
},
- dummy_chunks, false /* remove sources */
+ dummy_chunk_names, false /* remove sources */
);
while (!decoder.empty() && !decoder_dummy.empty()) {
@@ -883,7 +897,7 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
max_disk_space,
both_strands_mode && filter_suffix.empty() /* keep only canonical k-mers */),
bits_per_count_(bits_per_count), checkpoint_(checkpoint) {
- if (checkpoint.phase() == 0 && filter_suffix.size()
+ if (filter_suffix.size()
&& filter_suffix == std::string(filter_suffix.size(), BOSS::kSentinel)) {
kmer_collector_.add_kmer(std::vector(k + 1, BOSS::kSentinelCode));
}
@@ -933,7 +947,7 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
#define INIT_CHUNK(KMER) \
ChunkedWaitQueue> queue(ENCODER_BUFFER_SIZE); \
- recover_dummy_nodes(kmer_collector_, kmers, &queue, async_worker_); \
+ recover_dummy_nodes(kmer_collector_, kmers, &queue, async_worker_, &checkpoint_); \
logger->trace("Dummy source k-mers were reconstructed in {} sec", timer.elapsed()); \
result = new BOSS::Chunk(KmerExtractorBOSS().alphabet.size(), \
kmer_collector_.get_k() - 1, \
@@ -961,7 +975,7 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
private:
KmerCollector kmer_collector_;
uint8_t bits_per_count_;
- /** Used as an async executor for merging chunks from disk */
+ /** Async executor for merging chunks, generating reverse complements, etc. */
ThreadPool async_worker_ = ThreadPool(1, 1);
BuildCheckpoint checkpoint_;
};
From 1f731ff18f21558a1b44a8a16e88b3e802d7ab80 Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Sun, 6 Sep 2020 18:42:33 +0200
Subject: [PATCH 03/51] Running'
---
metagraph/src/common/elias_fano.cpp | 1 -
.../sorted_sets/sorted_set_disk_base.cpp | 10 ++--
.../sorted_sets/sorted_set_disk_base.hpp | 2 +-
.../succinct/boss_chunk_construct.cpp | 55 +++++++++++--------
4 files changed, 38 insertions(+), 30 deletions(-)
diff --git a/metagraph/src/common/elias_fano.cpp b/metagraph/src/common/elias_fano.cpp
index 6a3165465c..f8a6170e5f 100644
--- a/metagraph/src/common/elias_fano.cpp
+++ b/metagraph/src/common/elias_fano.cpp
@@ -29,7 +29,6 @@ std::vector concat(const std::vector &files, const std
concat_command += files[i] + suffix + " ";
}
concat_command += " > " + result + suffix;
- logger->trace("Executing '{}'", concat_command);
if (std::system(concat_command.c_str()))
throw std::runtime_error("Error while cat-ing files: " + concat_command);
diff --git a/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp b/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
index 49c8bc7092..87678abe20 100644
--- a/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
+++ b/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
@@ -64,13 +64,15 @@ std::vector SortedSetDiskBase::files_to_merge() {
}
template
-void SortedSetDiskBase::clear(const std::filesystem::path &tmp_path) {
+void SortedSetDiskBase::clear(const std::filesystem::path &tmp_path, bool remove_files) {
std::unique_lock exclusive_lock(mutex_);
std::unique_lock multi_insert_lock(multi_insert_mutex_);
is_merging_ = false;
- // remove the files that have not been requested to merge
- for (const auto &chunk_file : get_file_names()) {
- std::filesystem::remove(chunk_file);
+ if (remove_files) {
+ // remove the files that have not been requested to merge
+ for (const auto &chunk_file : get_file_names()) {
+ std::filesystem::remove(chunk_file);
+ }
}
chunk_count_ = 0;
l1_chunk_count_ = 0;
diff --git a/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp b/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp
index bc60cb4342..2ba2603723 100644
--- a/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp
+++ b/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp
@@ -74,7 +74,7 @@ class SortedSetDiskBase {
* sorted set may be expensive when #data_ is large. In these cases, prefer calling
* #clear and re-using the buffer.
*/
- void clear(const std::filesystem::path &tmp_path = "/tmp/");
+ void clear(const std::filesystem::path &tmp_path = "/tmp/", bool remove_files = true);
protected:
/** Advances #it by step or points to #end, whichever comes first. */
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
index c0d7b4110f..5a6f7ca1e4 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
@@ -360,7 +360,6 @@ std::vector split(size_t k,
std::vector names(chunk_count);
for (size_t i = 0; i < names.size(); ++i) {
names[i] = dir/("real_F_W_" + std::to_string(i));
- sinks.emplace_back(names[i], ENCODER_BUFFER_SIZE);
}
if (checkpoint->phase() > 2) {
@@ -368,6 +367,10 @@ std::vector split(size_t k,
return names;
}
+ for (size_t i = 0; i < names.size(); ++i) {
+ sinks.emplace_back(names[i], ENCODER_BUFFER_SIZE);
+ }
+
logger->info("Splitting k-mers into {} chunks...", chunk_count);
size_t num_kmers = 0;
for (auto &it = kmers.begin(); it != kmers.end(); ++it) {
@@ -475,8 +478,6 @@ generate_dummy_1_kmers(size_t k,
for (TAlphabet i = 0; i < alphabet_size; ++i) {
dummy_l1_names[i] = dir/("dummy_source_1_" + std::to_string(i));
dummy_sink_names[i] = dir/("dummy_sink_" + std::to_string(i));
- dummy_l1_chunks.emplace_back(dummy_l1_names[i], ENCODER_BUFFER_SIZE);
- dummy_sink_chunks.emplace_back(dummy_sink_names[i], ENCODER_BUFFER_SIZE);
}
if (checkpoint->phase() > 3) {
@@ -484,6 +485,11 @@ generate_dummy_1_kmers(size_t k,
return { dummy_sink_names, real_F_W };
}
+ for (TAlphabet i = 0; i < alphabet_size; ++i) {
+ dummy_l1_chunks.emplace_back(dummy_l1_names[i], ENCODER_BUFFER_SIZE);
+ dummy_sink_chunks.emplace_back(dummy_sink_names[i], ENCODER_BUFFER_SIZE);
+ }
+
logger->info("Generating dummy-1 source k-mers and dummy sink k-mers...");
uint64_t num_sink = 0;
uint64_t num_source = 0;
@@ -576,6 +582,7 @@ void add_reverse_complements(size_t k,
}
using T_INT_REAL = get_int_t; // either KMER_INT or
+ std::unique_ptr> rc_set;
std::vector to_merge = { dir/"original" };
if (checkpoint->phase() == 2) {
logger->info(
@@ -607,7 +614,7 @@ void add_reverse_complements(size_t k,
} else { // checkpoint->phase() < 2
std::string rc_dir = dir/"rc";
std::filesystem::create_directory(rc_dir);
- auto rc_set = std::make_unique>(
+ rc_set = std::make_unique>(
num_threads, buffer_size, rc_dir, std::numeric_limits::max());
common::EliasFanoEncoderBuffered original(dir/"original", ENCODER_BUFFER_SIZE);
@@ -641,6 +648,7 @@ void add_reverse_complements(size_t k,
rc_set->insert(buffer.begin(), buffer.end());
std::vector to_insert = rc_set->files_to_merge();
to_merge.insert(to_merge.end(), to_insert.begin(), to_insert.end());
+ rc_set->clear(dir, false /* don't delete chunk files! */);
original.finish();
checkpoint->set_phase(2);
checkpoint->store();
@@ -648,13 +656,11 @@ void add_reverse_complements(size_t k,
// start merging #original with #reverse_complements into #kmers
kmers->reset();
- async_worker.enqueue([rc_files = std::move(to_merge), kmers]() {
- common::MergeDecoder chunked_kmers(rc_files, false);
-
+ async_worker.enqueue([to_merge = std::move(to_merge), kmers]() {
+ common::MergeDecoder chunked_kmers(to_merge, false);
auto &kmers_int = reinterpret_cast &>(*kmers);
- std::optional kmer;
- while ((kmer = chunked_kmers.pop()).has_value()) {
- kmers_int.push(kmer.value());
+ while (!chunked_kmers.empty()) {
+ kmers_int.push(chunked_kmers.pop());
}
kmers->shutdown();
});
@@ -672,16 +678,16 @@ void add_reverse_complements(size_t k,
* the dummy-k kmers, for k=2..k
*/
template
-void recover_dummy_nodes(const KmerCollector &kmer_collector,
+[[clang::optnone]] void recover_dummy_nodes(const KmerCollector &kmer_collector,
ChunkedWaitQueue &kmers,
ChunkedWaitQueue *kmers_out,
ThreadPool &async_worker,
BuildCheckpoint* checkpoint) {
- using KMER_REAL = get_first_type_t; // 64/128/256-bit KmerBOSS
- using T_INT_REAL = get_int_t; // either KMER_INT or
+ using KMER_REAL = get_first_type_t; // 64/128/256-bit KmerBOSS on 2 bits
+ using T_INT_REAL = get_int_t; // either KMER_REAL or
- using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $
- using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer
+ using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $ (on 3 bits)
+ using KMER_INT = typename KMER::WordType; // the 64/128/256-bit integer in KMER
uint32_t previous_phase = checkpoint->phase();
if (checkpoint->phase() == 0) {
@@ -742,11 +748,10 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
// file names for the dummy_sink and dummy_source_1..k_0..3 kmers
std::vector dummy_chunk_names;
- const uint8_t alphabet_size = KmerExtractorBOSS::alphabet.size();
+ const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size();
for (size_t dummy_pref_len = 1; dummy_pref_len <= k; ++dummy_pref_len) {
for (TAlphabet i = 0; i < alphabet_size; ++i) {
- std::string suffix
- = std::to_string(dummy_pref_len + 1) + "_" + std::to_string(i);
+ std::string suffix = std::to_string(dummy_pref_len) + "_" + std::to_string(i);
dummy_chunk_names.push_back(dir/("dummy_source_" + suffix));
}
}
@@ -755,15 +760,16 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
if (checkpoint->phase() < 6) {
// generate dummy k-mers of prefix length 1..k
logger->trace("Starting generating dummy-1..{} source k-mers...", k);
- for (size_t dummy_pref_len = 2; dummy_pref_len <= k; ++dummy_pref_len) {
+ for (size_t dummy_pref_len = 1; dummy_pref_len < k; ++dummy_pref_len) {
std::vector> next_chunks;
for (TAlphabet i = 0; i < alphabet_size; ++i) {
- next_chunks.emplace_back(dummy_chunk_names[dummy_pref_len * alphabet_size + i],
- ENCODER_BUFFER_SIZE);
+ next_chunks.emplace_back(
+ dummy_chunk_names[dummy_pref_len * alphabet_size + i],
+ ENCODER_BUFFER_SIZE);
}
- // the chunks containing (dummy_pref_len-1) dummy k-mers
+ // chunks containing dummy k-mers of prefix length dummy_pref_len
auto begin = dummy_chunk_names.begin() + (dummy_pref_len - 1) * alphabet_size;
std::vector current_names(begin, begin + alphabet_size);
@@ -772,9 +778,10 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
const std::function &write_dummy
= [&](const KMER_INT &v) {
KMER kmer(v);
+ assert(kmer[0]);
kmer.to_prev(k + 1, BOSS::kSentinelCode);
if (prev_kmer != kmer) {
- next_chunks[kmer[0]].add(kmer.data());
+ next_chunks[kmer[0] - 1].add(kmer.data());
prev_kmer = std::move(kmer);
}
num_kmers++;
@@ -784,7 +791,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector,
std::for_each(next_chunks.begin(), next_chunks.end(),
[](auto &v) { v.finish(); });
logger->trace("Number of dummy k-mers with dummy prefix of length {}: {}",
- dummy_pref_len - 1, num_kmers);
+ dummy_pref_len, num_kmers);
}
checkpoint->set_phase(6);
From 7e981bad14433d9c8fc2cb6b8d8695fa9e46bb36 Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Sun, 6 Sep 2020 21:29:45 +0200
Subject: [PATCH 04/51] Clear checkpoint
---
metagraph/src/cli/build.cpp | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/metagraph/src/cli/build.cpp b/metagraph/src/cli/build.cpp
index 448ed61f8d..4d5011f9f0 100644
--- a/metagraph/src/cli/build.cpp
+++ b/metagraph/src/cli/build.cpp
@@ -107,6 +107,13 @@ int build_graph(Config *config) {
}
boss::BuildCheckpoint checkpoint(config->tmp_dir);
+ if (checkpoint.phase() > 0 && suffixes.size() > 1) {
+ logger->error(
+ "Checkpointing for multiple chunks not supported. "
+ "Remove {} or continue building chunk by chunk",
+ checkpoint.checkpoint_file());
+ std::exit(1);
+ }
auto constructor = boss::IBOSSChunkConstructor::initialize(
boss_graph->get_k(),
@@ -148,6 +155,7 @@ int build_graph(Config *config) {
} else {
graph_data.reset(next_chunk);
}
+ checkpoint.done();
}
assert(graph_data);
From b9fbc53281b4637817fc46506c87b06b054f3ff0 Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Mon, 7 Sep 2020 09:36:59 +0200
Subject: [PATCH 05/51] Small
---
.../src/graph/representation/succinct/boss_chunk_construct.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
index 5a6f7ca1e4..54b03aa369 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
@@ -837,7 +837,7 @@ template
return reinterpret_cast(v);
}
},
- dummy_chunk_names, false /* remove sources */
+ dummy_chunk_names, false /* remove sources */
);
while (!decoder.empty() && !decoder_dummy.empty()) {
From d83368ddb72ae6771f997294a989bce3f5682ded Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Mon, 7 Sep 2020 12:03:42 +0200
Subject: [PATCH 06/51] Add --phase
---
metagraph/src/cli/build.cpp | 10 +--------
metagraph/src/cli/config/config.cpp | 21 +++++++++++++++++++
metagraph/src/cli/config/config.hpp | 2 ++
.../succinct/boss_chunk_construct.hpp | 2 +-
4 files changed, 25 insertions(+), 10 deletions(-)
diff --git a/metagraph/src/cli/build.cpp b/metagraph/src/cli/build.cpp
index 4d5011f9f0..5b47e297fc 100644
--- a/metagraph/src/cli/build.cpp
+++ b/metagraph/src/cli/build.cpp
@@ -106,15 +106,7 @@ int build_graph(Config *config) {
logger->info("k-mer suffix: '{}'", suffix);
}
- boss::BuildCheckpoint checkpoint(config->tmp_dir);
- if (checkpoint.phase() > 0 && suffixes.size() > 1) {
- logger->error(
- "Checkpointing for multiple chunks not supported. "
- "Remove {} or continue building chunk by chunk",
- checkpoint.checkpoint_file());
- std::exit(1);
- }
-
+ boss::BuildCheckpoint checkpoint(config->checkpoint, config->tmp_dir);
auto constructor = boss::IBOSSChunkConstructor::initialize(
boss_graph->get_k(),
config->canonical,
diff --git a/metagraph/src/cli/config/config.cpp b/metagraph/src/cli/config/config.cpp
index b637704a55..6fe6d9bc84 100644
--- a/metagraph/src/cli/config/config.cpp
+++ b/metagraph/src/cli/config/config.cpp
@@ -334,6 +334,8 @@ Config::Config(int argc, char *argv[]) {
tmp_dir = get_value(i++);
} else if (!strcmp(argv[i], "--disk-cap-gb")) {
disk_cap_bytes = atoi(get_value(i++)) * 1e9;
+ } else if (!strcmp(argv[i], "--checkpoint")) {
+ checkpoint = true;
} else if (argv[i][0] == '-') {
fprintf(stderr, "\nERROR: Unknown option %s\n\n", argv[i]);
print_usage(argv[0], identity);
@@ -521,6 +523,24 @@ Config::Config(int argc, char *argv[]) {
if (identity == COMPARE && fnames.size() != 2)
print_usage_and_exit = true;
+ if (identity != BUILD && checkpoint) {
+ std::cerr << "Error: Checkpointing is only supported for disk-based building. "
+ "Remove --checkpoint.";
+ print_usage_and_exit = true;
+ }
+
+ if (checkpoint && tmp_dir.empty()) {
+ std::cerr << "Error: Checkpointing is only supported for disk-based building. "
+ "Please set --disk-swap.";
+ print_usage_and_exit = true;
+ }
+
+ if (suffix_len > 0) {
+ std::cerr << "Error: Checkpointing not supported for multiple suffixes. "
+ "Remove --checkpoint or specify each suffix separately using --suffix";
+ print_usage_and_exit = true;
+ }
+
if (discovery_fraction < 0 || discovery_fraction > 1)
print_usage_and_exit = true;
@@ -751,6 +771,7 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) {
fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n");
fprintf(stderr, "\t --disk-swap [STR] \tdirectory to use for temporary files [off]\n");
fprintf(stderr, "\t --disk-cap-gb [INT] \tmax temp disk space to use before forcing a merge, in GB [20]\n");
+ fprintf(stderr, "\t --checkpoint \t whether to save intermediate state in --disk-swap in order to resume an interrupted computation [off]\n");
} break;
case CLEAN: {
fprintf(stderr, "Usage: %s clean -o [options] GRAPH\n\n", prog_name.c_str());
diff --git a/metagraph/src/cli/config/config.hpp b/metagraph/src/cli/config/config.hpp
index e9c435fe7e..9b2fb3764d 100644
--- a/metagraph/src/cli/config/config.hpp
+++ b/metagraph/src/cli/config/config.hpp
@@ -142,6 +142,8 @@ class Config {
size_t disk_cap_bytes = 20e9; // 20GB default
+ bool checkpoint = true;
+
enum IdentityType {
NO_IDENTITY = -1,
BUILD = 1,
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp
index 0c22678a2a..8950b3be88 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.hpp
@@ -30,7 +30,7 @@ class IBOSSChunkConstructor : public IGraphChunkConstructor {
mtg::kmer::ContainerType container_type = mtg::kmer::ContainerType::VECTOR,
const std::filesystem::path &swap_dir = "/tmp/",
size_t max_disk_space_bytes = 1e9,
- const BuildCheckpoint& checkpoint = BuildCheckpoint("/tmp"));
+ const BuildCheckpoint& checkpoint = BuildCheckpoint(false, ""));
virtual uint64_t get_k() const = 0;
From 0b27107079c94941a3efb7cd7957047e0a55a7d6 Mon Sep 17 00:00:00 2001
From: Daniel Danciu
Date: Tue, 8 Sep 2020 15:58:17 +0200
Subject: [PATCH 07/51] Working checkpointing
---
metagraph/src/cli/build.cpp | 12 ++-
metagraph/src/cli/config/config.cpp | 21 +++---
metagraph/src/cli/config/config.hpp | 2 +-
metagraph/src/common/utils/file_utils.cpp | 7 +-
metagraph/src/common/utils/file_utils.hpp | 3 +-
.../succinct/boss_chunk_construct.cpp | 74 ++++++++++---------
.../succinct/boss_chunk_construct.hpp | 2 +-
.../succinct/build_checkpoint.cpp | 54 ++++++++++++++
.../succinct/build_checkpoint.hpp | 40 ++++++++++
metagraph/src/kmer/kmer_collector.cpp | 3 +-
10 files changed, 165 insertions(+), 53 deletions(-)
create mode 100644 metagraph/src/graph/representation/succinct/build_checkpoint.cpp
create mode 100644 metagraph/src/graph/representation/succinct/build_checkpoint.hpp
diff --git a/metagraph/src/cli/build.cpp b/metagraph/src/cli/build.cpp
index 5b47e297fc..de6ce38b8f 100644
--- a/metagraph/src/cli/build.cpp
+++ b/metagraph/src/cli/build.cpp
@@ -106,7 +106,9 @@ int build_graph(Config *config) {
logger->info("k-mer suffix: '{}'", suffix);
}
- boss::BuildCheckpoint checkpoint(config->checkpoint, config->tmp_dir);
+ bool checkpoint_enabled = !config->tmp_dir.empty() && suffixes.size() == 1;
+ boss::BuildCheckpoint checkpoint(checkpoint_enabled, config->outfbase,
+ config->phase);
auto constructor = boss::IBOSSChunkConstructor::initialize(
boss_graph->get_k(),
config->canonical,
@@ -121,16 +123,20 @@ int build_graph(Config *config) {
checkpoint
);
- if (checkpoint.phase() == 0) {
+ if (checkpoint.checkpoint() == 0) {
push_sequences(files, *config, timer, constructor.get());
} else {
logger->info("Skipping parsing sequences from input file(s)");
}
boss::BOSS::Chunk *next_chunk = constructor->build_chunk();
+
+ if (checkpoint.phase() < 2) { // phase 1 stops after generating dummy k-mers
+ assert(next_chunk == nullptr);
+ return 0;
+ }
logger->trace("Graph chunk with {} k-mers was built in {} sec",
next_chunk->size() - 1, timer.elapsed());
-
if (config->suffix.size()) {
logger->info("Serialize the graph chunk for suffix '{}'...", suffix);
timer.reset();
diff --git a/metagraph/src/cli/config/config.cpp b/metagraph/src/cli/config/config.cpp
index 6fe6d9bc84..6206720f04 100644
--- a/metagraph/src/cli/config/config.cpp
+++ b/metagraph/src/cli/config/config.cpp
@@ -334,8 +334,8 @@ Config::Config(int argc, char *argv[]) {
tmp_dir = get_value(i++);
} else if (!strcmp(argv[i], "--disk-cap-gb")) {
disk_cap_bytes = atoi(get_value(i++)) * 1e9;
- } else if (!strcmp(argv[i], "--checkpoint")) {
- checkpoint = true;
+ } else if (!strcmp(argv[i], "--phase")) {
+ phase = atoi(get_value(i++));
} else if (argv[i][0] == '-') {
fprintf(stderr, "\nERROR: Unknown option %s\n\n", argv[i]);
print_usage(argv[0], identity);
@@ -523,21 +523,20 @@ Config::Config(int argc, char *argv[]) {
if (identity == COMPARE && fnames.size() != 2)
print_usage_and_exit = true;
- if (identity != BUILD && checkpoint) {
- std::cerr << "Error: Checkpointing is only supported for disk-based building. "
- "Remove --checkpoint.";
+ if (identity != BUILD && phase != 2) {
+ std::cerr << "Error: Phases are only supported for building. Remove --phase.";
print_usage_and_exit = true;
}
- if (checkpoint && tmp_dir.empty()) {
- std::cerr << "Error: Checkpointing is only supported for disk-based building. "
+ if (phase != 2 && tmp_dir.empty()) {
+ std::cerr << "Error: Phases are only supported for disk-based building. "
"Please set --disk-swap.";
print_usage_and_exit = true;
}
- if (suffix_len > 0) {
- std::cerr << "Error: Checkpointing not supported for multiple suffixes. "
- "Remove --checkpoint or specify each suffix separately using --suffix";
+ if (phase != 2 && suffix_len > 0) {
+ std::cerr << "Error: Phases are not supported for multiple suffixes. "
+ "Remove --phase or specify each suffix separately using --suffix";
print_usage_and_exit = true;
}
@@ -771,7 +770,7 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) {
fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n");
fprintf(stderr, "\t --disk-swap [STR] \tdirectory to use for temporary files [off]\n");
fprintf(stderr, "\t --disk-cap-gb [INT] \tmax temp disk space to use before forcing a merge, in GB [20]\n");
- fprintf(stderr, "\t --checkpoint \t whether to save intermediate state in --disk-swap in order to resume an interrupted computation [off]\n");
+ fprintf(stderr, "\t --phase [INT] \tmax where to stop the computation (1=generate kmers, 2= build all) [2]\n");
} break;
case CLEAN: {
fprintf(stderr, "Usage: %s clean -o [options] GRAPH\n\n", prog_name.c_str());
diff --git a/metagraph/src/cli/config/config.hpp b/metagraph/src/cli/config/config.hpp
index 9b2fb3764d..c3b97c38c3 100644
--- a/metagraph/src/cli/config/config.hpp
+++ b/metagraph/src/cli/config/config.hpp
@@ -142,7 +142,7 @@ class Config {
size_t disk_cap_bytes = 20e9; // 20GB default
- bool checkpoint = true;
+ uint32_t phase = 2;
enum IdentityType {
NO_IDENTITY = -1,
diff --git a/metagraph/src/common/utils/file_utils.cpp b/metagraph/src/common/utils/file_utils.cpp
index 9d0e7f646c..91f964999c 100644
--- a/metagraph/src/common/utils/file_utils.cpp
+++ b/metagraph/src/common/utils/file_utils.cpp
@@ -38,7 +38,8 @@ void cleanup_tmp_dir_on_exit() {
}
std::filesystem::path create_temp_dir(std::filesystem::path path,
- const std::string &name) {
+ const std::string &name,
+ bool clean_on_exit) {
if (path.empty())
path = "./";
@@ -48,6 +49,10 @@ std::filesystem::path create_temp_dir(std::filesystem::path path,
exit(1);
}
+ if (!clean_on_exit) {
+ return tmp_dir_str;
+ }
+
if (TMP_DIRS.empty()) {
if (std::signal(SIGINT, cleanup_tmp_dir_on_signal) == SIG_ERR)
logger->error("Couldn't reset the signal handler for SIGINT");
diff --git a/metagraph/src/common/utils/file_utils.hpp b/metagraph/src/common/utils/file_utils.hpp
index 1975acc136..8430f743c8 100644
--- a/metagraph/src/common/utils/file_utils.hpp
+++ b/metagraph/src/common/utils/file_utils.hpp
@@ -14,7 +14,8 @@
namespace utils {
std::filesystem::path create_temp_dir(std::filesystem::path path,
- const std::string &name = "");
+ const std::string &name = "",
+ bool clean_on_exit = true);
bool check_if_writable(const std::string &filename);
diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
index 54b03aa369..6ef36a2664 100644
--- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
+++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp
@@ -362,7 +362,7 @@ std::vector split(size_t k,
names[i] = dir/("real_F_W_" + std::to_string(i));
}
- if (checkpoint->phase() > 2) {
+ if (checkpoint->checkpoint() > 2) {
logger->info("Skipping splitting k-mers into chunks");
return names;
}
@@ -384,7 +384,7 @@ std::vector split(size_t k,
std::for_each(sinks.begin(), sinks.end(), [](auto &f) { f.finish(); });
logger->trace("Total number of real k-mers: {}", num_kmers);
- checkpoint->set_phase(3);
+ checkpoint->set_checkpoint(3);
checkpoint->store();
return names;
@@ -414,7 +414,7 @@ concatenate_chunks(const std::filesystem::path &dir,
real_split_by_W[W] = dir/("real_split_by_W_" + std::to_string(W));
}
- if (checkpoint->phase() > 4) {
+ if (checkpoint->checkpoint() > 4) {
return { real_split_by_W, dummy_sink_name };
}
@@ -443,7 +443,7 @@ concatenate_chunks(const std::filesystem::path &dir,
std::filesystem::remove(name);
}
- checkpoint->set_phase(5);
+ checkpoint->set_checkpoint(5);
checkpoint->store();
return { real_split_by_W, dummy_sink_name };
}
@@ -480,7 +480,7 @@ generate_dummy_1_kmers(size_t k,
dummy_sink_names[i] = dir/("dummy_sink_" + std::to_string(i));
}
- if (checkpoint->phase() > 3) {
+ if (checkpoint->checkpoint() > 3) {
logger->info("Skipping generating dummy-1 source k-mers and dummy sink kmers");
return { dummy_sink_names, real_F_W };
}
@@ -559,7 +559,7 @@ generate_dummy_1_kmers(size_t k,
logger->trace("Generated {} dummy sink and {} dummy source k-mers", num_sink,
num_source);
- checkpoint->set_phase(4);
+ checkpoint->set_checkpoint(4);
checkpoint->store();
return { dummy_sink_names, real_F_W };
@@ -576,7 +576,7 @@ void add_reverse_complements(size_t k,
ThreadPool& async_worker,
ChunkedWaitQueue *kmers,
BuildCheckpoint *checkpoint) {
- if (checkpoint->phase() > 2) {
+ if (checkpoint->checkpoint() > 2) {
logger->info("Skipping generating reverse complements");
return;
}
@@ -584,7 +584,7 @@ void add_reverse_complements(size_t k,
std::unique_ptr> rc_set;
std::vector to_merge = { dir/"original" };
- if (checkpoint->phase() == 2) {
+ if (checkpoint->checkpoint() == 2) {
logger->info(
"Continuing from checkpoint phase 2. Looking for 'original' and "
"'rc/chunk_*' in {}",
@@ -611,7 +611,7 @@ void add_reverse_complements(size_t k,
checkpoint->kmer_dir());
std::exit(1);
}
- } else { // checkpoint->phase() < 2
+ } else { // checkpoint->checkpoint() < 2
std::string rc_dir = dir/"rc";
std::filesystem::create_directory(rc_dir);
rc_set = std::make_unique>(
@@ -650,7 +650,7 @@ void add_reverse_complements(size_t k,
to_merge.insert(to_merge.end(), to_insert.begin(), to_insert.end());
rc_set->clear(dir, false /* don't delete chunk files! */);
original.finish();
- checkpoint->set_phase(2);
+ checkpoint->set_checkpoint(2);
checkpoint->store();
}
@@ -689,10 +689,10 @@ template
using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $ (on 3 bits)
using KMER_INT = typename KMER::WordType; // the 64/128/256-bit integer in KMER
- uint32_t previous_phase = checkpoint->phase();
- if (checkpoint->phase() == 0) {
+ uint32_t previous_phase = checkpoint->checkpoint();
+ if (checkpoint->checkpoint() == 0) {
checkpoint->set_kmer_dir(kmer_collector.tmp_dir());
- checkpoint->set_phase(1);
+ checkpoint->set_checkpoint(1);
checkpoint->store();
}
@@ -757,7 +757,7 @@ template
}
dummy_chunk_names.push_back(dummy_sink_name);
- if (checkpoint->phase() < 6) {
+ if (checkpoint->checkpoint() < 6) {
// generate dummy k-mers of prefix length 1..k
logger->trace("Starting generating dummy-1..{} source k-mers...", k);
for (size_t dummy_pref_len = 1; dummy_pref_len < k; ++dummy_pref_len) {
@@ -794,7 +794,7 @@ template
dummy_pref_len, num_kmers);
}
- checkpoint->set_phase(6);
+ checkpoint->set_checkpoint(6);
checkpoint->store();
} else {
logger->info("Skipping generating dummy-1..{} source k-mers", k);
@@ -903,7 +903,7 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
tmp_dir,
max_disk_space,
both_strands_mode && filter_suffix.empty() /* keep only canonical k-mers */),
- bits_per_count_(bits_per_count), checkpoint_(checkpoint) {
+ bits_per_count_(bits_per_count), checkpoint_(checkpoint), tmp_dir_(tmp_dir) {
if (filter_suffix.size()
&& filter_suffix == std::string(filter_suffix.size(), BOSS::kSentinel)) {
kmer_collector_.add_kmer(std::vector(k + 1, BOSS::kSentinelCode));
@@ -926,6 +926,25 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor {
return kmer_collector_.tmp_dir();
}
+ template
+ BOSS::Chunk *build_chunk_2bit(Container &kmers) {
+ logger->trace("Reconstructing all required dummy source k-mers...");
+
+ Timer timer;
+ ChunkedWaitQueue