diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/main.yml similarity index 95% rename from .github/workflows/c-cpp.yml rename to .github/workflows/main.yml index 3d99fe1..aaac384 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/main.yml @@ -1,12 +1,8 @@ -name: C/C++ CI +name: GitHub Actions CI on: push: - branches: [ master, v2, feature/tests ] - paths-ignore: - - '**.md' - pull_request: - branches: [ master, v2, feature/tests ] + branches: [ master, v2] paths-ignore: - '**.md' workflow_dispatch: @@ -22,7 +18,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: make - run: make + run: make -j2 - name: build run: | diff --git a/.github/workflows/self-hosted.yml b/.github/workflows/self-hosted.yml new file mode 100644 index 0000000..7113a87 --- /dev/null +++ b/.github/workflows/self-hosted.yml @@ -0,0 +1,148 @@ +name: Self-hosted CI + +on: + pull_request: + branches: [ master, v2 ] + paths-ignore: + - '**.md' + workflow_dispatch: + +jobs: + + ######################################################################################## + checkout: + name: checkout + runs-on: [self-hosted, kmer-db] + + steps: + - uses: actions/checkout@v2 + + + ######################################################################################## + make-tests: + name: make + runs-on: [self-hosted, kmer-db] + needs: checkout + strategy: + fail-fast: false + matrix: + compiler: [9, 10, 11] + + + steps: + - name: make (g++-${{matrix.compiler}}) + run: | + make -j32 CXX=g++-${{matrix.compiler}} + cp ./kmer-db ./kmer-db-${{matrix.compiler}} + make clean + + ######################################################################################## + main-test: + name: Main tests + runs-on: [self-hosted, kmer-db] + needs: make-tests + strategy: + fail-fast: false + matrix: + compiler: [9, 10, 11] + threads: [1, 2, 16, 0] + env: + INPUT_DIR: ./test/virus + + steps: + + - name: build + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} ${INPUT_DIR}/seqs.part1.list k18.parts.db + + - name: new2all + run: | + ./kmer-db-${{matrix.compiler}} new2all -t ${{matrix.threads}} k18.parts.db ${INPUT_DIR}/seqs.part2.list k18.n2a.csv + cmp k18.n2a.csv ${INPUT_DIR}/k18.n2a.csv + + - name: new2all (sparse) + run: | + ./kmer-db-${{matrix.compiler}} new2all -t ${{matrix.threads}} -sparse k18.parts.db ${INPUT_DIR}/seqs.part2.list k18.n2a.sparse.csv + cmp k18.n2a.sparse.csv ${INPUT_DIR}/k18.n2a.sparse.csv + + - name: extend + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -extend -k 25 ${INPUT_DIR}/seqs.part2.list k18.parts.db + + - name: all2all + run: | + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.parts.db k18.csv + cmp k18.csv ${INPUT_DIR}/k18.csv + + - name: all2all (sparse) + run: | + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} -sparse k18.parts.db k18.sparse.csv + cmp k18.sparse.csv ${INPUT_DIR}/k18.sparse.csv + + - name: distance + run: | + ./kmer-db-${{matrix.compiler}} distance jaccard min max cosine mash k18.csv + cmp k18.csv.jaccard ${INPUT_DIR}/k18.csv.jaccard + cmp k18.csv.min ${INPUT_DIR}/k18.csv.min + cmp k18.csv.max ${INPUT_DIR}/k18.csv.max + cmp k18.csv.cosine ${INPUT_DIR}/k18.csv.cosine + cmp k18.csv.mash ${INPUT_DIR}/k18.csv.mash + + - name: build (default k) + all2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} ${INPUT_DIR}/seqs.list k18.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.db k18.csv + cmp k18.csv ${INPUT_DIR}/k18.csv + + - name: build (default k, multifasta) + all2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -multisample-fasta ${INPUT_DIR}/multi.list k18.multi.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.multi.db k18.multi.csv + cmp k18.multi.csv ${INPUT_DIR}/k18.csv + + - name: build (default k, 2 x multifasta) + all2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -multisample-fasta ${INPUT_DIR}/multi.split.list k18.multi.split.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.multi.split.db k18.multi.split.csv + cmp k18.multi.split.csv ${INPUT_DIR}/k18.csv + + - name: build (default k) + extend + all2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} ${INPUT_DIR}/seqs.part1.list k18.parts.db + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -extend -k 25 ${INPUT_DIR}/seqs.part2.list k18.parts.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.parts.db k18.parts.csv + cmp k18.parts.csv ${INPUT_DIR}/k18.csv + + - name: build (default k, fraction 0.1) + all2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -f 0.1 ${INPUT_DIR}/seqs.list k18.frac.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.frac.db k18.frac.csv + cmp k18.frac.csv ${INPUT_DIR}/k18.frac.csv + + - name: minhash (default k, fraction 0.1) + build + all2all + run: | + ./kmer-db-${{matrix.compiler}} minhash 0.1 ${INPUT_DIR}/seqs.list + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -from-minhash -k 25 ${INPUT_DIR}/seqs.list k18.minhash.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k18.minhash.db k18.minhash.csv + cmp k18.minhash.csv ${INPUT_DIR}/k18.frac.csv + + - name: build (k=24) + all2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -k 24 ${INPUT_DIR}/seqs.list k24.db + ./kmer-db-${{matrix.compiler}} all2all -t ${{matrix.threads}} k24.db k24.csv + cmp k24.csv ${INPUT_DIR}/k24.csv + + - name: build (k=25, f=0.1) + one2all + run: | + ./kmer-db-${{matrix.compiler}} build -t ${{matrix.threads}} -k 25 -f 0.1 ${INPUT_DIR}/seqs.part1.list k25.db + ./kmer-db-${{matrix.compiler}} one2all -t ${{matrix.threads}} k25.db ${INPUT_DIR}/data/MT159713 MT159713.csv + cmp MT159713.csv ${INPUT_DIR}/MT159713.csv + + - name: new2all (against itself) + run: | + ./kmer-db-${{matrix.compiler}} new2all -t ${{matrix.threads}} k18.db ${INPUT_DIR}/seqs.list k18.n2a.itself.csv + cmp k18.n2a.itself.csv ${INPUT_DIR}/k18.n2a.itself.csv + + + + diff --git a/README.md b/README.md index 8d00a32..2d89466 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Kmer-db [![GitHub downloads](https://img.shields.io/github/downloads/refresh-bio/kmer-db/total.svg?style=flag&label=GitHub%20downloads)](https://github.com/refresh-bio/kmer-db/releases) [![Bioconda downloads](https://img.shields.io/conda/dn/bioconda/kmer-db.svg?style=flag&label=Bioconda%20downloads)](https://anaconda.org/bioconda/kmer-db) -[![C/C++ CI](https://github.com/refresh-bio/kmer-db/workflows/C/C++%20CI/badge.svg)](https://github.com/refresh-bio/kmer-db/actions) +[![GitHub Actions CI](../../actions/workflows/main.yml/badge.svg)](../../actions/workflows/main.yml) [![License](https://anaconda.org/bioconda/famsa/badges/license.svg)](https://www.gnu.org/licenses/gpl-3.0.html) diff --git a/makefile b/makefile index 847dbf0..e21066e 100644 --- a/makefile +++ b/makefile @@ -35,8 +35,6 @@ else EXTRA_LIBS_DIR = "" endif - -CC = g++ LDFLAGS += CFLAGS += -Wall -O3 -m64 -std=c++11 $(OMP_FLAGS) -pthread CFLAGS_AVX2 += $(CFLAGS) -mavx2 -I $(KMER_DB_LIBS_DIR) -I $(EXTRA_LIBS_DIR) @@ -60,35 +58,35 @@ OBJS := $(KMER_DB_MAIN_DIR)/analyzer.o \ $(KMER_DB_LIBS_DIR)/mmer.o $(KMER_DB_MAIN_DIR)/parallel_sorter.o: $(KMER_DB_MAIN_DIR)/parallel_sorter.cpp - $(CC) -O3 -mavx -m64 -std=c++11 -pthread $(OMP_FLAGS) -c $< -o $@ + $(CXX) -O3 -mavx -m64 -std=c++11 -pthread $(OMP_FLAGS) -c $< -o $@ ifeq ($(HAVE_AVX2),) ## no avx2 support AVX_OBJS := $(KMER_DB_MAIN_DIR)/row_add_avx.o $(KMER_DB_MAIN_DIR)/row_add_avx.o: $(KMER_DB_MAIN_DIR)/row_add_avx.cpp - $(CC) $(CFLAGS) -DNO_AVX2 -c $< -o $@ + $(CXX) $(CFLAGS) -DNO_AVX2 -c $< -o $@ else # with avx2 support AVX_OBJS := $(KMER_DB_MAIN_DIR)/row_add_avx.o \ $(KMER_DB_MAIN_DIR)/row_add_avx2.o $(KMER_DB_MAIN_DIR)/row_add_avx.o: $(KMER_DB_MAIN_DIR)/row_add_avx.cpp - $(CC) $(CFLAGS) -c $< -o $@ + $(CXX) $(CFLAGS) -c $< -o $@ $(KMER_DB_MAIN_DIR)/row_add_avx2.o: $(KMER_DB_MAIN_DIR)/row_add_avx2.cpp - $(CC) $(CFLAGS_AVX2) -c $< -o $@ + $(CXX) $(CFLAGS_AVX2) -c $< -o $@ endif %.o: %.cpp - $(CC) $(CFLAGS) -c $< -o $@ + $(CXX) $(CFLAGS) -c $< -o $@ ifeq ($(INTERNAL_ZLIB),true) kmer-db: $(OBJS) $(AVX_OBJS) - $(CC) $(CLINK) $(LDFLAGS) -o $(KMER_DB_ROOT_DIR)/$@ $(OBJS) $(AVX_OBJS) $(EXTRA_LIBS_DIR)/libz.a + $(CXX) $(CLINK) $(LDFLAGS) -o $(KMER_DB_ROOT_DIR)/$@ $(OBJS) $(AVX_OBJS) $(EXTRA_LIBS_DIR)/libz.a else kmer-db: $(OBJS) $(AVX_OBJS) - $(CC) $(CLINK) $(LDFLAGS) -o $(KMER_DB_ROOT_DIR)/$@ $(OBJS) $(AVX_OBJS) -lz + $(CXX) $(CLINK) $(LDFLAGS) -o $(KMER_DB_ROOT_DIR)/$@ $(OBJS) $(AVX_OBJS) -lz endif clean: diff --git a/src/console.cpp b/src/console.cpp index 7a99a4c..ebe743b 100644 --- a/src/console.cpp +++ b/src/console.cpp @@ -256,18 +256,18 @@ int Console::runMinHash(const std::string& multipleKmcSamples, InputFile::Format std::chrono::duration loadingTime{ 0 }, processingTime{ 0 }; - LOG_DEBUG << "Creating Loader object..." << endl; + LOG_DEBUG << "Creating Loader object..." << endl ; auto filter = std::make_shared(fraction, 0, kmerLength); LoaderEx loader(filter, inputFormat, numReaderThreads, numThreads, multisampleFasta); loader.configure(multipleKmcSamples); - LOG_DEBUG << "Starting loop..." << endl; + LOG_DEBUG << "Starting loop..." << endl ; auto totalStart = std::chrono::high_resolution_clock::now(); for (int i = 0; !loader.isCompleted(); ++i) { auto partialTime = std::chrono::duration(std::chrono::high_resolution_clock::now() - totalStart); - LOG_VERBOSE << "Processing time: " << partialTime.count() << ", loader buffers: " << (loader.getBytes() >> 20) << " MB" << endl; + LOG_VERBOSE << "Processing time: " << partialTime.count() << ", loader buffers: " << (loader.getBytes() >> 20) << " MB" << endl ; auto task = loader.popTask(i); @@ -302,7 +302,7 @@ int Console::runBuildDatabase( InputFile::Format inputFormat, bool extendDb){ - LOG_DEBUG << "Creating PrefixKmerDb object" << endl; + LOG_DEBUG << "Creating PrefixKmerDb object" << endl ; AbstractKmerDb* db = new PrefixKmerDb(numThreads); std::shared_ptr filter; @@ -323,17 +323,17 @@ int Console::runBuildDatabase( std::chrono::duration sortingTime{ 0 }, processingTime{ 0 }; cout << "Processing samples..." << endl; - LOG_DEBUG << "Creating Loader object..." << endl; + LOG_DEBUG << "Creating Loader object..." << endl ; LoaderEx loader(filter, inputFormat, numReaderThreads, numThreads, multisampleFasta); loader.configure(multipleSamples); - LOG_DEBUG << "Starting loop..." << endl; + LOG_DEBUG << "Starting loop..." << endl ; auto totalStart = std::chrono::high_resolution_clock::now(); int sample_id = 0; for (; !loader.isCompleted(); ++sample_id) { auto partialTime = std::chrono::duration(std::chrono::high_resolution_clock::now() - totalStart); - LOG_VERBOSE << "Processing time: " << partialTime.count() << ", loader buffers: " << (loader.getBytes() >> 20) << " MB" << endl; + LOG_VERBOSE << "Processing time: " << partialTime.count() << ", loader buffers: " << (loader.getBytes() >> 20) << " MB" << endl ; auto task = loader.popTask(sample_id); @@ -364,7 +364,7 @@ int Console::runBuildDatabase( processingTime += std::chrono::high_resolution_clock::now() - start; loader.releaseTask(*task); - LOG_VERBOSE << db->printProgress() << endl; + LOG_VERBOSE << db->printProgress() << endl ; } } @@ -570,7 +570,7 @@ int Console::runNewVsAll(const std::string& dbFilename, const std::string& multi dt = std::chrono::high_resolution_clock::now() - start; cout << "OK (" << dt.count() << " seconds)" << endl << db.printStats() << endl; - LOG_DEBUG << "Creating Loader object..." << endl; + LOG_DEBUG << "Creating Loader object..." << endl ; shared_ptr filter = shared_ptr(new MinHashFilter(db.getFraction(), db.getStartFraction(), db.getKmerLength())); LoaderEx loader(filter, inputFormat, numReaderThreads, numThreads, multisampleFasta); @@ -595,11 +595,11 @@ int Console::runNewVsAll(const std::string& dbFilename, const std::string& multi for (int tid = 0; tid < numThreads; ++tid) { workers[tid] = thread([&db, &loader, &freeBuffersQueue, &similarityQueue, &buffers, &calculator, &sample_id, tid]() { + int task_id = sample_id.fetch_add(1); while (!loader.isCompleted()) { - int task_id = sample_id.fetch_add(1); - std::shared_ptr task; - + std::shared_ptr task; if ((task = loader.popTask(task_id)) && freeBuffersQueue.Pop(task->bufferId2)) { + LOG_DEBUG << "loader queue " << task_id + 1 << " -> (" << task->id + 1 << ", " << task->sampleName << ")" << endl ; buffers[task->bufferId2].clear(); // only unique k-mers are needed @@ -608,12 +608,14 @@ int Console::runNewVsAll(const std::string& dbFilename, const std::string& multi calculator.one2all(db, task->kmers, task->kmersCount, buffers[task->bufferId2]); similarityQueue.Push(task_id, task); - LOG_DEBUG << "(" << task_id + 1 << ") -> similarity queue, tid:" << tid << ", buf:" << task->bufferId2 << endl; + LOG_DEBUG << "(" << task->id + 1 << ", " << task->sampleName << ") -> similarity queue, tid:" << tid << ", buf:" << task->bufferId2 << endl ; + task_id = sample_id.fetch_add(1); + } } similarityQueue.MarkCompleted(); - LOG_DEBUG << "processing finished, tid: " << tid << endl; + LOG_DEBUG << "similarity thread completed: " << tid << endl ; }); } @@ -643,7 +645,7 @@ int Console::runNewVsAll(const std::string& dbFilename, const std::string& multi cout << "\r" << task_id + 1 << "... " << std::flush; } - LOG_DEBUG << "similarity queue -> (" << task_id + 1 << ", " << task->sampleName << "), buf:" << task->bufferId2 << endl; + LOG_DEBUG << "similarity queue -> (" << task_id + 1 << ", " << task->sampleName << "), buf:" << task->bufferId2 << endl ; const auto& buf = buffers[task->bufferId2]; ptr = row; @@ -837,7 +839,7 @@ int Console::runAnalyzeDatabase(const std::string & multipleKmcSamples, const st LoaderEx loader(filter, InputFile::GENOME, numReaderThreads, numThreads, true); int numSamples = loader.configure(multipleKmcSamples); - LOG_DEBUG << "Starting loop..." << endl; + LOG_DEBUG << "Starting loop..." << endl ; for (int i = 0; i < numSamples; ++i) { auto task = loader.popTask(i); diff --git a/src/console.h b/src/console.h index f9b4873..b78d657 100644 --- a/src/console.h +++ b/src/console.h @@ -72,8 +72,7 @@ class Console int runMinHash(const std::string& multipleSamples, InputFile::Format inputFormat); int runDistanceCalculation(const std::string& similarityFilename, const std::vector& metricNames, bool usePhylip); - int runDistanceCalculationPhylip(const std::string& similarityFilename, const std::vector& metricNames); - + int runListPatterns(const std::string& dbFilename, const std::string& patternFile); int runAnalyzeDatabase(const std::string& multipleKmcSamples, const std::string& dbFilename); diff --git a/src/hashmap_lp.h b/src/hashmap_lp.h index 9224a2f..ad1932f 100644 --- a/src/hashmap_lp.h +++ b/src/hashmap_lp.h @@ -155,10 +155,6 @@ class hash_map_lp { filled = 0; for (size_t i = 0; i < allocated; ++i) { - /* if (i % (1 << 15) == 0) - { - LOG_DEBUG << "Clear: " << i << " from " << allocated << std::endl; - } */ data[i].key = Key{}; data[i].val = empty_value; } @@ -173,7 +169,7 @@ class hash_map_lp { int n_threads = std::max((int)std::thread::hardware_concurrency(), 1); std::vector threads(n_threads); - //LOG_DEBUG << "Clearing hashtable (parallel)..."; + //LOG_DEBUG << "Clearing hashtable (parallel)..." ; for (int tid = 0; tid < n_threads; ++tid) { threads[tid] = std::thread([tid, n_threads, this] { @@ -306,22 +302,22 @@ class hash_map_lp { item_t *old_data = data; size_t old_allocated = allocated; - // LOG_DEBUG << "reserve_for_additional - in\n"; + // LOG_DEBUG << "reserve_for_additional - in\n" ; while (filled + n_elems > allocated * max_fill_factor) allocated *= 2; - // LOG_DEBUG << "reserve_for_additional - new_size: " << allocated << std::endl; + // LOG_DEBUG << "reserve_for_additional - new_size: " << allocated << std::endl ; allocated_mask = allocated - 1ull; size_when_restruct = (size_t)(allocated * max_fill_factor); - // LOG_NORMAL << "\n--- Realloc to: " << allocated << "..." << std::endl; + // LOG_DEBUG << "\n--- Realloc to: " << allocated << "..." << std::endl ; data = new item_t[allocated]; - // LOG_DEBUG << "reserve_for_additional - after new: " << allocated << std::endl; + // LOG_DEBUG << "reserve_for_additional - after new: " << allocated << std::endl ; clear(); - // LOG_DEBUG << "reserve_for_additional - after clear: " << allocated << std::endl; + // LOG_DEBUG << "reserve_for_additional - after clear: " << allocated << std::endl ; ht_memory += allocated * sizeof(item_t); diff --git a/src/input_file.cpp b/src/input_file.cpp index 9527f86..7e47601 100644 --- a/src/input_file.cpp +++ b/src/input_file.cpp @@ -136,7 +136,7 @@ bool GenomeInputFile::load( kmers = kmersBuffer.data(); - LOG_DEBUG << "Extraction: " << kmersCount << " kmers, " << chromosomes.size() << " chromosomes, " << totalLen << " bases" << endl; + //LOG_DEBUG << "Extraction: " << kmersCount << " kmers, " << chromosomes.size() << " chromosomes, " << totalLen << " bases" << endl ; } // free memory @@ -270,7 +270,7 @@ bool GenomeInputFile::unzip(char* compressedData, size_t compressedSize, char*&o //multistream detection if (stream.avail_in >= 2 && stream.next_in[0] == 0x1f && stream.next_in[1] == 0x8b) { if (inflateReset(&stream) != Z_OK) { - LOG_NORMAL << "Error while reading gzip file\n"; + LOG_NORMAL << "Error while reading gzip file\n" ; exit(1); } } @@ -485,7 +485,7 @@ bool KmcInputFile::load( #endif filterValue = ((double)kmersCount / _total_kmers); // this may differ from theoretical - LOG_DEBUG << "Filter passed: " << kmersCount << "/" << _total_kmers << "(" << filterValue << ")" << endl; + //LOG_DEBUG << "Filter passed: " << kmersCount << "/" << _total_kmers << "(" << filterValue << ")" << endl ; filterValue = minhashFilter->getFraction(); // use proper value return kmcfile->Close(); } diff --git a/src/kmer_db.h b/src/kmer_db.h index cf0a546..d343e6b 100644 --- a/src/kmer_db.h +++ b/src/kmer_db.h @@ -90,7 +90,7 @@ class AbstractKmerDb { size_t kmersCount, uint32_t kmerLength, double fraction) { - LOG_VERBOSE << "Adding sample " << sampleNames.size() + 1 << ": " << sampleName << " (" << kmersCount << " kmers)" << endl; + LOG_VERBOSE << "Adding sample " << sampleNames.size() + 1 << ": " << sampleName << " (" << kmersCount << " kmers)" << endl ; if (!isInitialized) { initialize(kmerLength, fraction); diff --git a/src/loader_ex.cpp b/src/loader_ex.cpp index b245343..de37c7a 100644 --- a/src/loader_ex.cpp +++ b/src/loader_ex.cpp @@ -20,7 +20,7 @@ LoaderEx::LoaderEx( std::shared_ptr filter, InputFile::Format inputFormat, int suggestedNumThreads, - int outputBuffersCount, + int numConsumers, bool multisampleFasta, bool storePositions) : @@ -31,6 +31,8 @@ LoaderEx::LoaderEx( { readers.resize(numThreads); + int outputBuffersCount = numThreads + numConsumers; + // configure queues queues.input.Restart(1); queues.readers.Restart(1, outputBuffersCount); @@ -101,7 +103,7 @@ void LoaderEx::prefetcherJob(std::shared_ptr filter) { std::shared_ptr task; if (this->queues.input.Pop(task)) { - LOG_DEBUG << "input queue -> (file " << task->fileId + 1 << ")" << endl << std::flush; + LOG_DEBUG << "input queue -> (file " << task->fileId + 1 << ")" << endl ; if (this->inputFormat == InputFile::KMC) { task->file = std::make_shared(filter->clone()); @@ -115,16 +117,16 @@ void LoaderEx::prefetcherJob(std::shared_ptr filter) { if (task->file->open(task->filePath)) { queues.readers.Push(task); - LOG_DEBUG << "(file " << task->fileId + 1 << ") -> readers queue " << endl << std::flush; + LOG_DEBUG << "(file " << task->fileId + 1 << ", " << task->filePath << ") -> readers queue " << endl ; } else { - cout << "failed:" << task->filePath << endl << std::flush; + cout << "failed:" << task->filePath << endl; } } } queues.readers.MarkCompleted(); - LOG_DEBUG << "readers queue: mark completed" << endl << std::flush; + LOG_DEBUG << "reader thread completed" << endl ; } // ***************************************************************************************** @@ -139,7 +141,7 @@ void LoaderEx::readerJob(int tid) { // get buffer and input task if (this->queues.freeBuffers.Pop(bufferId) && this->queues.readers.Pop(inputTask)) { - LOG_DEBUG << "readers queue -> (file " << inputTask->fileId + 1 << "), tid: " << tid << std::flush; + LOG_DEBUG << "readers queue -> (file " << inputTask->fileId + 1 << "), tid: " << tid << endl; auto sampleTask = make_shared( inputTask->fileId, @@ -154,8 +156,8 @@ void LoaderEx::readerJob(int tid) { if (ok) { ++bufferRefCounters[bufferId]; queues.output.Push((int)sampleTask->id, sampleTask); - LOG_DEBUG << "(sample " << sampleTask->id + 1 << ") -> output queue, buf: " << bufferId << std::endl << std::flush; - LOG_VERBOSE << "File loaded successfully: " << inputTask->fileId + 1 << endl << std::flush; + LOG_DEBUG << "(sample " << sampleTask->id + 1 << ", " << sampleTask->sampleName <<") -> loader output queue, buf: " << bufferId << std::endl ; + LOG_VERBOSE << "File loaded successfully: " << inputTask->fileId + 1 << endl ; } else { cout << "File load failed: " << inputTask->fileId + 1 << endl << std::flush; @@ -164,7 +166,7 @@ void LoaderEx::readerJob(int tid) { } queues.output.MarkCompleted(); - LOG_DEBUG << "output queue: mark completed" << endl << std::flush; + LOG_DEBUG << "loader thread completed: " << tid << endl ; } @@ -189,7 +191,7 @@ void LoaderEx::multifastaReaderJob() { // initialize multifasta file if (genomicFile->initMultiFasta()) { - LOG_DEBUG << "multifasta initialized " << endl << std::flush; + LOG_DEBUG << "multifasta initialized " << endl ; while (true) { @@ -215,12 +217,12 @@ void LoaderEx::multifastaReaderJob() { queues.output.Push((int)sampleTask->id, sampleTask); ++count; - LOG_DEBUG << "(sample " << sampleTask->id + 1 << ") -> output queue, buf: " << bufferId << std::endl << std::flush; + LOG_DEBUG << "(sample " << sampleTask->id + 1 << ") -> output queue, buf: " << bufferId << std::endl ; } } if (count > 0) { - LOG_VERBOSE << "File loaded successfully: " << inputTask->fileId + 1 << endl << std::flush; + LOG_VERBOSE << "File loaded successfully: " << inputTask->fileId + 1 << endl ; } else { cout << "File load failed: " << inputTask->fileId + 1 << endl << std::flush; @@ -228,6 +230,6 @@ void LoaderEx::multifastaReaderJob() { } queues.output.MarkCompleted(); - LOG_DEBUG << "output queue: mark completed" << endl << std::flush; + LOG_DEBUG << "output queue: mark completed" << endl ; } diff --git a/src/loader_ex.h b/src/loader_ex.h index 3b03c3e..5f0fd1d 100644 --- a/src/loader_ex.h +++ b/src/loader_ex.h @@ -29,7 +29,7 @@ class LoaderEx { std::shared_ptr filter, InputFile::Format inputFormat, int suggestedNumThreads, - int outputBuffersCount, + int numConsumers, bool multisampleFasta, bool storePositions = false); @@ -37,17 +37,18 @@ class LoaderEx { int configure(const std::string& multipleKmcSamples); - std::shared_ptr popTask(int fileId) { + std::shared_ptr popTask(int sampleId) { std::shared_ptr task; - queues.output.Pop(fileId, task); - LOG_DEBUG << "output queue -> (sample " << fileId + 1 << ")" << std::endl << std::flush; + if (queues.output.Pop(sampleId, task)) { + LOG_DEBUG << "output queue -> (sample " << sampleId + 1 << ")" << std::endl; + } return task; } void releaseTask(SampleTask& t) { if (--bufferRefCounters[t.bufferId] == 0) { queues.freeBuffers.Push(t.bufferId); - LOG_DEBUG << "sample " << t.id + 1 << ": release buffer " << t.bufferId << std::endl << std::flush; + LOG_DEBUG << "sample " << t.id + 1 << ": release buffer " << t.bufferId << std::endl ; } } diff --git a/src/log.cpp b/src/log.cpp index f071153..72b170c 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -25,47 +25,6 @@ uint64_t NumericConversions::powers10[]; -// ***************************************************************************************** -// -Log::Log() : enabled(false), out(&std::cerr) -{ -} - -// ***************************************************************************************** -// -Log& Log::operator<< (std::ostream& (*pf)(std::ostream&)) -{ - if (enabled) { - *this->out << pf; - out->flush(); - } - return *this; -} - -// ***************************************************************************************** -// -Log& Log::operator<< (std::ios& (*pf)(std::ios&)) -{ - if (enabled) { - *this->out << pf; - out->flush(); - } - - return *this; -} - -// ***************************************************************************************** -// -Log& Log::operator<< (std::ios_base& (*pf)(std::ios_base&)) -{ - if (enabled) { - *this->out << pf; - out->flush(); - } - - return *this; -} - // ***************************************************************************************** // std::string Log::formatLargeNumber(uint64_t num, int minWidth) { diff --git a/src/log.h b/src/log.h index 9bf0fae..117864a 100644 --- a/src/log.h +++ b/src/log.h @@ -12,10 +12,37 @@ Authors: Sebastian Deorowicz, Adam Gudys, Maciej Dlugosz, Marek Kokot, Agnieszka #include #include #include +#include +#include -#define LOG_NORMAL Log::getInstance(Log::LEVEL_NORMAL) #define LOG_VERBOSE Log::getInstance(Log::LEVEL_VERBOSE) #define LOG_DEBUG Log::getInstance(Log::LEVEL_DEBUG) +#define LOG_NORMAL Log::getInstance(Log::LEVEL_DEBUG) + + +class LockedStream { + std::ostream* out{ nullptr }; + std::recursive_mutex* mtx{ nullptr }; + +public: + LockedStream() : mtx() {} + LockedStream(std::ostream& out, std::recursive_mutex& mtx) : out(&out), mtx(&mtx) {} + ~LockedStream() { + if (out) { + out->flush(); + mtx->unlock(); + } + } + + template + LockedStream& operator<< (const T& v) { if (out) { *out << v; }; return *this; } + + LockedStream& operator<< (std::ostream& (*pf)(std::ostream&)) { if (out) { *out << pf; }; return *this; } + LockedStream& operator<< (std::ios& (*pf)(std::ios&)) { if (out) { *out << pf; }; return *this; } + LockedStream& operator<< (std::ios& (*pf)(std::ios_base&)) { if (out) { *out << pf; }; return *this; } + +}; + // ***************************************************************************************** // @@ -30,37 +57,64 @@ class Log void disable() { enabled = false; } // ***************************************************************************************** - // static Log& getInstance(int level) { - static std::vector> logs; - if (logs.size() == 0) { - logs.push_back(std::shared_ptr(new Log())); - logs.push_back(std::shared_ptr(new Log())); - logs.push_back(std::shared_ptr(new Log())); - } - + static std::vector> logs{ + std::shared_ptr(new Log()), + std::shared_ptr(new Log()), + std::shared_ptr(new Log()) + }; + return *logs[level]; } // ***************************************************************************************** - // template - Log& operator<<(T v) { - if (enabled) { *this->out << v; } - return *this; + LockedStream operator<<(const T& v) { + if (enabled) { + mtx.lock(); + out << v; + return LockedStream(out, mtx); + } + return LockedStream(); } - Log& operator<< (std::ostream& (*pf)(std::ostream&)); - Log& operator<< (std::ios& (*pf)(std::ios&)); - Log& operator<< (std::ios_base& (*pf)(std::ios_base&)); + // ***************************************************************************************** + LockedStream operator<< (std::ostream& (*pf)(std::ostream&)) { + if (enabled) { + mtx.lock(); + out << pf; + return LockedStream(out, mtx); + } + return LockedStream(); + } + + // ***************************************************************************************** + LockedStream operator<< (std::ios& (*pf)(std::ios&)) { + if (enabled) { + mtx.lock(); + out << pf; + return LockedStream(out, mtx); + } + return LockedStream(); + } + + // ***************************************************************************************** + LockedStream operator<< (std::ios& (*pf)(std::ios_base&)) { + if (enabled) { + mtx.lock(); + out << pf; + return LockedStream(out, mtx); + } + return LockedStream(); + } static std::string formatLargeNumber(uint64_t num, int minWidth = 0); protected: bool enabled; - std::ostream* out; + std::ostream& out{std::cerr}; + std::recursive_mutex mtx; - Log(); + Log() : enabled(false) {} }; - diff --git a/src/prefix_kmer_db.cpp b/src/prefix_kmer_db.cpp index 37bd51f..fd91291 100644 --- a/src/prefix_kmer_db.cpp +++ b/src/prefix_kmer_db.cpp @@ -124,7 +124,7 @@ void PrefixKmerDb::hashtableJob() { times.hashtableResize_worker += std::chrono::high_resolution_clock::now() - start; - // LOG_DEBUG << "Block: " << task.block_id << ", lo_prefix: " << lo_prefix << ", hi_prefix: " << hi_prefix << endl << flush; + // LOG_DEBUG << "Block: " << task.block_id << ", lo_prefix: " << lo_prefix << ", hi_prefix: " << hi_prefix << endl ; start = std::chrono::high_resolution_clock::now(); @@ -182,7 +182,7 @@ void PrefixKmerDb::patternJob() { uint32_t lo = task.lo; uint32_t hi = task.hi; - // LOG_DEBUG << "Pattern job " << task.block_id << " started (" << lo << "-" << hi << ")" << endl; + // LOG_DEBUG << "Pattern job " << task.block_id << " started (" << lo << "-" << hi << ")" << endl ; sample_id_t sampleId = (sample_id_t)(task.sample_id); @@ -230,7 +230,7 @@ void PrefixKmerDb::patternJob() { // update memory statistics (atomic - no sync needed) stats.patternBytes += deltaSize; - // LOG_DEBUG << "Pattern job " << task.block_id << " finished" << endl; + // LOG_DEBUG << "Pattern job " << task.block_id << " finished" << endl ; this->semaphore.dec(); } } @@ -254,7 +254,7 @@ sample_id_t PrefixKmerDb::addKmers( //-------------------------------------------------------------------------- // get prefix histogram (parallel) - LOG_DEBUG << "Hashtable resizing, searching, and adding (parallel)..." << endl; + LOG_DEBUG << "Hashtable resizing, searching, and adding (parallel)..." << endl ; auto start = std::chrono::high_resolution_clock::now(); std::fill(prefixHistogram.begin(), prefixHistogram.end(), 0); samplePatterns.resize(n_kmers); @@ -300,7 +300,7 @@ sample_id_t PrefixKmerDb::addKmers( semaphore.inc((int)hashtableTasks.size()); for (size_t tid = 0; tid < hashtableTasks.size(); ++tid) { - // LOG_DEBUG << "Hashtable job " << tid << " scheduled (" << hashtableTasks[tid].lo << "-" << hashtableTasks[tid].hi << ")" << endl; + // LOG_DEBUG << "Hashtable job " << tid << " scheduled (" << hashtableTasks[tid].lo << "-" << hashtableTasks[tid].hi << ")" << endl ; queues.hashtableAddition.Push(hashtableTasks[tid]); } @@ -316,7 +316,7 @@ sample_id_t PrefixKmerDb::addKmers( //-------------------------------------------------------------------------- // sort in parallel - LOG_DEBUG << "Sorting (parallel)..." << endl; + LOG_DEBUG << "Sorting (parallel)..." << endl ; start = std::chrono::high_resolution_clock::now(); ParallelSort(samplePatterns.data(), samplePatterns.size(), nullptr, 0, 0, num_threads); @@ -324,13 +324,13 @@ sample_id_t PrefixKmerDb::addKmers( //-------------------------------------------------------------------------- // exdtend patterns - LOG_DEBUG << "Extending patterns (parallel)..." << endl; + LOG_DEBUG << "Extending patterns (parallel)..." << endl ; start = std::chrono::high_resolution_clock::now(); std::atomic new_pid((pattern_id_t)patterns.size()); // prepare tasks num_blocks = num_threads; - block = n_kmers / num_blocks; + block = std::max(n_kmers / num_blocks, 1u); std::vector patternTasks(num_blocks, PatternTask{ 0, n_kmers, n_kmers, sampleId, &new_pid}); patternTasks[0].lo = 0; @@ -388,7 +388,7 @@ sample_id_t PrefixKmerDb::addKmers( semaphore.inc((int)patternTasks.size()); for (size_t tid = 0; tid < patternTasks.size(); ++tid) { - // LOG_DEBUG << "Pattern job " << tid << " scheduled" << endl; + // LOG_DEBUG << "Pattern job " << tid << " scheduled" << endl ; queues.patternExtension.Push(patternTasks[tid]); } @@ -398,7 +398,7 @@ sample_id_t PrefixKmerDb::addKmers( //-------------------------------------------------------------------------- // patterns insertion - LOG_DEBUG << "Moving patterns to global collection (serial)..." << endl; + LOG_DEBUG << "Moving patterns to global collection (serial)..." << endl ; // extend by 1.5 on reallocation if (patterns.capacity() < (size_t)new_pid) { diff --git a/src/queue.h b/src/queue.h index d04b18d..056fac1 100644 --- a/src/queue.h +++ b/src/queue.h @@ -301,11 +301,10 @@ class SynchronizedPriorityQueue unique_lock lck(mtx); cv_queue_empty.wait(lck, [this, priority] {return (!this->q.empty() && q.top().first == priority) || !this->n_producers; }); - if (n_elements == 0) + if (n_elements == 0 || q.top().first != priority) return false; - auto entry = q.top(); - data = entry.second; + data = q.top().second; q.pop(); --n_elements; // if (n_elements == 0) @@ -318,6 +317,7 @@ class SynchronizedPriorityQueue // uint32_t GetSize() { + unique_lock lck(mtx); return n_elements; } }; diff --git a/src/similarity_calculator.cpp b/src/similarity_calculator.cpp index bea7e7e..1a5ae1d 100644 --- a/src/similarity_calculator.cpp +++ b/src/similarity_calculator.cpp @@ -467,7 +467,7 @@ void SimilarityCalculator::one2all(const PrefixKmerDb& db, const kmer_t* dt = std::chrono::high_resolution_clock::now() - start; - LOG_VERBOSE << "Pattern listing time: " << dt.count() << endl; + //LOG_VERBOSE << "Pattern listing time: " << dt.count() << endl ; start = std::chrono::high_resolution_clock::now(); std::vector workers(num_threads); @@ -531,7 +531,7 @@ void SimilarityCalculator::one2all(const PrefixKmerDb& db, const kmer_t* } dt = std::chrono::high_resolution_clock::now() - start; - LOG_VERBOSE << "Pattern unpacking time: " << dt.count() << endl; + //LOG_VERBOSE << "Pattern unpacking time: " << dt.count() << endl ; } // ***************************************************************************************** @@ -594,7 +594,7 @@ void SimilarityCalculator::one2all(const PrefixKmerDb& db, const kmer_t* patterns2count.clear(); dt = std::chrono::high_resolution_clock::now() - start; - LOG_VERBOSE << "Pattern listing time: " << dt.count() << endl; + LOG_VERBOSE << "Pattern listing time: " << dt.count() << endl ; start = std::chrono::high_resolution_clock::now(); @@ -646,5 +646,5 @@ void SimilarityCalculator::one2all(const PrefixKmerDb& db, const kmer_t* } dt = std::chrono::high_resolution_clock::now() - start; - LOG_VERBOSE << "Pattern unpacking time: " << dt.count() << endl; + LOG_VERBOSE << "Pattern unpacking time: " << dt.count() << endl ; } \ No newline at end of file diff --git a/src/version.h b/src/version.h index edfb16e..8ec51b7 100644 --- a/src/version.h +++ b/src/version.h @@ -1,12 +1,17 @@ #pragma once -#define VERSION "1.9.3" -#define DATE "27.08.2021" +#define VERSION "1.9.4" +#define DATE "19.04.2022" /* Version history +1.9.4 (19.04.2022) +- fixed database construction for very small samples (#kmers < #threads) +- fixed synchronization issues in new2all mode (non-deterministic row order in output matrix). +- fixed deadlock during database construction when -multisample-fasta mode is run on more than one file. + 1.9.3 (27.08.2021) - Disabled h-mer hashatables loading in all2all mode. - Fast CSV saving in all2all and new2all modes.