Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checkpointing to metagraph build #197

Draft
wants to merge 59 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8567bff
Intermediate
danieldanciu Sep 4, 2020
97294cd
First attempt
danieldanciu Sep 5, 2020
1f731ff
Running'
danieldanciu Sep 6, 2020
7e981ba
Clear checkpoint
danieldanciu Sep 6, 2020
b9fbc53
Small
danieldanciu Sep 7, 2020
d83368d
Add --phase
danieldanciu Sep 7, 2020
0b27107
Working checkpointing
danieldanciu Sep 8, 2020
7d53b52
Added functional tests
danieldanciu Sep 8, 2020
7476e7a
Small changes self review
danieldanciu Sep 8, 2020
fa07b12
Add integration tests for parallel building
danieldanciu Sep 9, 2020
8301c22
Remove forgotten optnone
danieldanciu Sep 10, 2020
71ac989
Minor rename
danieldanciu Sep 10, 2020
12e1ada
Support filesystem
danieldanciu Sep 10, 2020
da1aea6
Mor elogging
danieldanciu Sep 15, 2020
bc63e50
small
danieldanciu Sep 15, 2020
0756a42
small
danieldanciu Sep 15, 2020
a2e6c0f
small
danieldanciu Sep 15, 2020
15b42da
Don't clean up unmerged files
danieldanciu Sep 15, 2020
8a9121b
Don't clean up unmerged files in SortedSetDisk, so that computation c…
danieldanciu Sep 15, 2020
9fabdb5
10 chunks
danieldanciu Sep 15, 2020
a4fb98c
Wait for merging before stopping
danieldanciu Sep 17, 2020
be4d767
Small fix
danieldanciu Sep 17, 2020
89d1588
Actually wait for merge to happen
danieldanciu Sep 17, 2020
4c73921
Write checkpoint after phase1
danieldanciu Sep 18, 2020
6b0b9d9
Address review comments
danieldanciu Sep 19, 2020
caaf272
Address missed comments
danieldanciu Sep 19, 2020
5dfcee8
Merged phase
danieldanciu Sep 19, 2020
62ec669
First checkpoint, then delete
danieldanciu Sep 20, 2020
9cf55a9
First checkpoint, then delete
danieldanciu Sep 20, 2020
88748b9
Merge branch 'phase' into phase2
danieldanciu Oct 12, 2020
2792ab0
Added some logging
danieldanciu Oct 12, 2020
a40dea5
Merge branch 'phase2' into phase
danieldanciu Oct 12, 2020
7600830
Addressed review comments
danieldanciu Oct 12, 2020
c4c43ec
minor
danieldanciu Oct 12, 2020
007845f
merged with dev
danieldanciu Oct 18, 2020
866e786
Remove double declaration
danieldanciu Oct 19, 2020
d52ca9e
Merged with dev
danieldanciu Oct 22, 2020
5a3a8a3
Default to phase 3
danieldanciu Oct 22, 2020
ad6d965
Skip phase 2 if no rc's are generated
danieldanciu Oct 22, 2020
6493adb
Flush sorted set at end of phase
danieldanciu Oct 23, 2020
57c111c
Better temp dir
danieldanciu Oct 24, 2020
e6c0d95
Don't push kmers into queue if phase is < 3
danieldanciu Oct 24, 2020
7cd5b63
Merged with dev
danieldanciu Nov 23, 2020
80f1e48
Set checkpoint to 2 when RC's are not being generated
danieldanciu Nov 23, 2020
de1173c
Clean up temp files in SSD
danieldanciu Nov 23, 2020
fb0ad63
Merge remote-tracking branch 'origin/dev' into phase
danieldanciu Nov 24, 2020
5e42d10
Remove trace logs
danieldanciu Nov 24, 2020
d728edd
s/remove/remove_all
danieldanciu Nov 24, 2020
79d97e4
Merged with dev
danieldanciu Dec 14, 2020
03d5b2d
Simplify test_build_phase
danieldanciu Dec 14, 2020
e4aed78
Small fix in checkpoint continuation
danieldanciu Dec 14, 2020
82a9e8e
Verbose mode for test_build_phase
danieldanciu Dec 16, 2020
76c0d1b
A bit more debugging
danieldanciu Dec 16, 2020
2a5b52f
All trace logs
danieldanciu Dec 18, 2020
dc278db
Reset kmers when continuing from cp 1
danieldanciu Dec 20, 2020
98ad5da
Skip phase 2
danieldanciu Dec 20, 2020
8c953a6
Copy file names
danieldanciu Dec 20, 2020
c238b20
Acquire lock when flushing
danieldanciu Dec 20, 2020
0d399b7
Count orig/rc
danieldanciu Dec 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions metagraph/integration_tests/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,76 @@ def test_build_chunks_from_kmc_canonical(self, build):
self.assertEqual('nodes (k): 802920', params_str[1])
self.assertEqual('canonical mode: yes', params_str[2])

@parameterized.expand(['succinct_disk'])
def test_build_phase(self, build):
representation, tmp_dir = build_params[build]

construct_command = '{exe} build --phase 1 --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)
self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint'))

construct_command = '{exe} build --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)

res = self.__get_stats(self.tempdir.name + '/graph' + graph_file_extension[representation])
self.assertEqual(res.returncode, 0)
params_str = res.stdout.decode().split('\n')[2:]
self.assertEqual('k: 20', params_str[0])
self.assertEqual('nodes (k): 1159851', params_str[1])
self.assertEqual('canonical mode: yes', params_str[2])

# tests that we can build and resume 2 separate graphs on the same machine
@parameterized.expand(['succinct_disk'])
def test_build_phase_parallel(self, build):
representation, tmp_dir = build_params[build]

for name in ('graph1', 'graph2'):
construct_command = '{exe} build --phase 1 --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/' + name,
input=TEST_DATA_DIR + ('/transcripts_1000.fa' if name == 'graph1' else '/transcripts_100.fa')
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)
self.assertTrue(os.path.isfile(self.tempdir.name + '/' + name + '.checkpoint'))
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved

for name in ('graph', 'graph2'):
construct_command = '{exe} build --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/' + name,
input=TEST_DATA_DIR + ('/transcripts_1000.fa' if name == 'graph1' else '/transcripts_100.fa')
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)

res = self.__get_stats(self.tempdir.name + '/graph' + graph_file_extension[representation])
self.assertEqual(res.returncode, 0)
params_str = res.stdout.decode().split('\n')[2:]
self.assertEqual('k: 20', params_str[0])
self.assertEqual('nodes (k): ' + ('1159851' if name == 'graph1' else '91584'), params_str[1])
self.assertEqual('canonical mode: yes', params_str[2])

if __name__ == '__main__':
unittest.main()
36 changes: 36 additions & 0 deletions metagraph/integration_tests/test_build_weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,5 +373,41 @@ def test_kmer_count_width_large(self, build, k_width_result):
self.assertEqual('avg weight: {}'.format(avg_count_expected), params_str[4])


def test_build_phase(self):
construct_command = '{exe} build --phase 1 --mask-dummy -k 20 --count-kmers --disk-swap {tmp_dir} ' \
'--count-width 16 -o {outfile} {input}'.format(
exe=METAGRAPH,
tmp_dir=self.tempdir.name,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)
self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint'))

construct_command = '{exe} build --mask-dummy -k 20 --count-kmers --disk-swap {tmp_dir} --count-width 16 ' \
'-o {outfile} {input}'.format(
exe=METAGRAPH,
tmp_dir=self.tempdir.name,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)

stats_command = '{exe} stats {graph}'.format(
exe=METAGRAPH,
graph=self.tempdir.name + '/graph.dbg',
)
res = subprocess.run(stats_command.split(), stdout=PIPE)
self.assertEqual(res.returncode, 0)
params_str = res.stdout.decode().split('\n')[2:]
self.assertEqual('k: 20', params_str[0])
self.assertEqual('nodes (k): 591997', params_str[1])
self.assertEqual('canonical mode: no', params_str[2])
self.assertEqual('nnz weights: 591997', params_str[3])
self.assertEqual('avg weight: 2.48587', params_str[4])


if __name__ == '__main__':
unittest.main()
20 changes: 17 additions & 3 deletions metagraph/src/cli/build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "graph/representation/bitmap/dbg_bitmap_construct.hpp"
#include "graph/representation/succinct/dbg_succinct.hpp"
#include "graph/representation/succinct/boss_construct.hpp"
#include "graph/representation/succinct/build_checkpoint.hpp"
#include "graph/graph_extensions/node_weights.hpp"
#include "config/config.hpp"
#include "parse_sequences.hpp"
Expand Down Expand Up @@ -105,6 +106,9 @@ int build_graph(Config *config) {
logger->info("k-mer suffix: '{}'", suffix);
}

bool checkpoint_enabled = !config->tmp_dir.empty() && suffixes.size() == 1;
boss::BuildCheckpoint checkpoint(checkpoint_enabled, config->outfbase,
config->phase);
auto constructor = boss::IBOSSChunkConstructor::initialize(
boss_graph->get_k(),
config->canonical,
Expand All @@ -115,15 +119,24 @@ int build_graph(Config *config) {
config->tmp_dir.empty() ? kmer::ContainerType::VECTOR
: kmer::ContainerType::VECTOR_DISK,
config->tmp_dir,
config->disk_cap_bytes
config->disk_cap_bytes,
checkpoint
);

push_sequences(files, *config, timer, constructor.get());
if (checkpoint.checkpoint() == 0) {
push_sequences(files, *config, timer, constructor.get());
} else {
logger->info("Skipping parsing sequences from input file(s)");
}

boss::BOSS::Chunk *next_chunk = constructor->build_chunk();

if (checkpoint.phase() < 2) { // phase 1 stops after generating dummy k-mers
assert(next_chunk == nullptr);
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved
return 0;
}
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved
logger->trace("Graph chunk with {} k-mers was built in {} sec",
next_chunk->size() - 1, timer.elapsed());

danieldanciu marked this conversation as resolved.
Show resolved Hide resolved
if (config->suffix.size()) {
logger->info("Serialize the graph chunk for suffix '{}'...", suffix);
timer.reset();
Expand All @@ -140,6 +153,7 @@ int build_graph(Config *config) {
} else {
graph_data.reset(next_chunk);
}
checkpoint.done();
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved
}

assert(graph_data);
Expand Down
22 changes: 22 additions & 0 deletions metagraph/src/cli/config/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ Config::Config(int argc, char *argv[]) {
tmp_dir = get_value(i++);
} else if (!strcmp(argv[i], "--disk-cap-gb")) {
disk_cap_bytes = atoi(get_value(i++)) * 1e9;
} else if (!strcmp(argv[i], "--phase")) {
phase = atoi(get_value(i++));
} else if (argv[i][0] == '-') {
fprintf(stderr, "\nERROR: Unknown option %s\n\n", argv[i]);
print_usage(argv[0], identity);
Expand Down Expand Up @@ -521,6 +523,25 @@ Config::Config(int argc, char *argv[]) {
if (identity == COMPARE && fnames.size() != 2)
print_usage_and_exit = true;

if (identity != BUILD && phase != 2) {
std::cerr << "Error: Phases are only supported for building. Remove --phase"
<< std::endl;
print_usage_and_exit = true;
}
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved

if (phase != 2 && tmp_dir.empty()) {
std::cerr << "Error: Phases are only supported for disk-based building. "
"Please set --disk-swap." << std::endl;
print_usage_and_exit = true;
}

if (phase != 2 && suffix_len > 0) {
std::cerr << "Error: Phases are not supported for multiple suffixes. Remove "
"--phase or specify each suffix separately using --suffix"
<< std::endl;
print_usage_and_exit = true;
}

if (discovery_fraction < 0 || discovery_fraction > 1)
print_usage_and_exit = true;

Expand Down Expand Up @@ -751,6 +772,7 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) {
fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n");
fprintf(stderr, "\t --disk-swap [STR] \tdirectory to use for temporary files [off]\n");
fprintf(stderr, "\t --disk-cap-gb [INT] \tmax temp disk space to use before forcing a merge, in GB [20]\n");
fprintf(stderr, "\t --phase [INT] \tmax where to stop the computation (1=generate kmers, 2= build all) [2]\n");
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved
} break;
case CLEAN: {
fprintf(stderr, "Usage: %s clean -o <outfile-base> [options] GRAPH\n\n", prog_name.c_str());
Expand Down
2 changes: 2 additions & 0 deletions metagraph/src/cli/config/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ class Config {

size_t disk_cap_bytes = 20e9; // 20GB default

uint32_t phase = 2; // build phase; 1 = generate kmers, 2 = complete build

enum IdentityType {
NO_IDENTITY = -1,
BUILD = 1,
Expand Down
14 changes: 7 additions & 7 deletions metagraph/src/common/elias_fano.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,29 @@
namespace mtg {
namespace common {

void concat(const std::vector<std::string> &files, const std::string &result) {
std::vector<std::string> concat(const std::vector<std::string> &files, const std::string &result) {
if (files.empty())
return;
return {};
std::vector<std::string> original_files;
danieldanciu marked this conversation as resolved.
Show resolved Hide resolved

std::vector<std::string> suffixes = { "", ".up" };
if (std::filesystem::exists(files[0] + ".count"))
suffixes.push_back(".count");

for (const std::string &suffix : suffixes) {
std::string concat_command = "cat ";
for (uint32_t i = 1; i < files.size(); ++i) {
for (uint32_t i = 0; i < files.size(); ++i) {
concat_command += files[i] + suffix + " ";
}
concat_command += " >> " + files[0] + suffix;

concat_command += " > " + result + suffix;
if (std::system(concat_command.c_str()))
throw std::runtime_error("Error while cat-ing files: " + concat_command);

std::filesystem::rename(files[0] + suffix, result + suffix);
for (const std::string &f : files) {
std::filesystem::remove(f + suffix);
original_files.push_back(f + suffix);
}
}
return original_files;
}

template <class T, class Enable = void>
Expand Down
2 changes: 1 addition & 1 deletion metagraph/src/common/elias_fano.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace common {
* The files store data that is ordered and the values in a file are smaller than the
* values in the next file.
*/
void concat(const std::vector<std::string> &files, const std::string &result);
std::vector<std::string> concat(const std::vector<std::string> &files, const std::string &result);

/**
* Elias-Fano encoder that streams the encoded result into a file.
Expand Down
12 changes: 7 additions & 5 deletions metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ std::vector<std::string> SortedSetDiskBase<T>::files_to_merge() {
}

template <typename T>
void SortedSetDiskBase<T>::clear(const std::filesystem::path &tmp_path) {
void SortedSetDiskBase<T>::clear(const std::filesystem::path &tmp_path, bool remove_files) {
std::unique_lock<std::mutex> exclusive_lock(mutex_);
std::unique_lock<std::shared_timed_mutex> multi_insert_lock(multi_insert_mutex_);
is_merging_ = false;
// remove the files that have not been requested to merge
for (const auto &chunk_file : get_file_names()) {
std::filesystem::remove(chunk_file);
if (remove_files) {
// remove the files that have not been requested to merge
for (const auto &chunk_file : get_file_names()) {
std::filesystem::remove(chunk_file);
}
}
chunk_count_ = 0;
l1_chunk_count_ = 0;
Expand All @@ -91,7 +93,7 @@ void SortedSetDiskBase<T>::start_merging_async() {
async_worker_.enqueue([file_names, this]() {
std::function<void(const T &)> on_new_item
= [this](const T &v) { merge_queue_.push(v); };
merge_files(file_names, on_new_item);
merge_files(file_names, on_new_item, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is responsible for cleaning these files up? Does it mean that it never removes the old temp files until all the k-mers are sorted, and thus, the disk usage has grown a lot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only files affected by this change are the original collected chunks. These will be indeed deleted after checkpoint 5 instead of after checkpoint 2.
Why not delete them now? Because if the program crashes between merge_queue_.shutdown() (so the merge queue still has elements to merge, but no more new elements will be added) and the queue being emptied, then we lose all data. The old chunks have been deleted, but the new merge is not yet ready.

The timespan between the files being deleted and the merge queue being emptied is quite short, so the probability of this happening is low, but at the same time I cannot leave this flaw in the code with a clear conscience.

Since recover_dummy_kmers in boss_chunk_construct doesn't receive a reference to this object, but only to the ChunkedWaitQueue it creates, it cannot even manually call clear() on it when the merge is done.

merge_queue_.shutdown();
});
}
Expand Down
2 changes: 1 addition & 1 deletion metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SortedSetDiskBase {
* sorted set may be expensive when #data_ is large. In these cases, prefer calling
* #clear and re-using the buffer.
*/
void clear(const std::filesystem::path &tmp_path = "/tmp/");
void clear(const std::filesystem::path &tmp_path = "/tmp/", bool remove_files = true);

protected:
/** Advances #it by step or points to #end, whichever comes first. */
Expand Down
11 changes: 8 additions & 3 deletions metagraph/src/common/utils/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ void cleanup_tmp_dir_on_exit() {
}

std::filesystem::path create_temp_dir(std::filesystem::path path,
const std::string &name) {
const std::string &name,
bool clean_on_exit) {
if (path.empty())
path = "./";

Expand All @@ -48,6 +49,12 @@ std::filesystem::path create_temp_dir(std::filesystem::path path,
exit(1);
}

logger->trace("Created temporary directory {}", tmp_dir_str);

if (!clean_on_exit) {
return tmp_dir_str;
}

if (TMP_DIRS.empty()) {
if (std::signal(SIGINT, cleanup_tmp_dir_on_signal) == SIG_ERR)
logger->error("Couldn't reset the signal handler for SIGINT");
Expand All @@ -57,8 +64,6 @@ std::filesystem::path create_temp_dir(std::filesystem::path path,
logger->error("Couldn't reset the atexit handler");
}

logger->trace("Registered temporary directory {}", tmp_dir_str);

static std::mutex mu;
std::lock_guard<std::mutex> lock(mu);

Expand Down
3 changes: 2 additions & 1 deletion metagraph/src/common/utils/file_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
namespace utils {

std::filesystem::path create_temp_dir(std::filesystem::path path,
const std::string &name = "");
const std::string &name = "",
bool clean_on_exit = true);


bool check_if_writable(const std::string &filename);
Expand Down
Loading