diff --git a/pybwa/libbwaaln.pxd b/pybwa/libbwaaln.pxd index 2ae40ce..469daa3 100644 --- a/pybwa/libbwaaln.pxd +++ b/pybwa/libbwaaln.pxd @@ -6,6 +6,9 @@ from libc.stdio cimport FILE cdef extern from "libbwaaln_utils.h": void bwa_cal_pac_pos_with_bwt(const bntseq_t *bns, int n_seqs, bwa_seq_t *seqs, int max_mm, float fnr, bwt_t *bwt) + void bwa_cal_sa_reg_gap_threaded(int tid, bwt_t *const bwt, int n_seqs, bwa_seq_t *seqs, + const gap_opt_t *opt) + cdef extern from "bntseq.h": unsigned char nst_nt4_table[256] diff --git a/pybwa/libbwaaln.pyi b/pybwa/libbwaaln.pyi index 4635da9..f8d6bc7 100644 --- a/pybwa/libbwaaln.pyi +++ b/pybwa/libbwaaln.pyi @@ -22,6 +22,7 @@ class BwaAlnOptions: stop_at_max_best_hits: int | None = None, max_hits: int | None = 3, log_scaled_gap_penalty: bool | None = None, + threads: int | None = None, ) -> None: ... max_mismatches: int # -n # fnr:float # -n @@ -38,6 +39,7 @@ class BwaAlnOptions: max_hits: int # bwa samse -n log_scaled_gap_penalty: bool = True # -L with_md: bool = True # bwa samse -d + threads: int # -t class BwaAln: def __init__(self, prefix: str | Path | None = None, index: BwaIndex | None = None) -> None: ... diff --git a/pybwa/libbwaaln.pyx b/pybwa/libbwaaln.pyx index 8ac88bc..cb82e08 100644 --- a/pybwa/libbwaaln.pyx +++ b/pybwa/libbwaaln.pyx @@ -35,6 +35,7 @@ cdef class BwaAlnOptions: max_hits (int | None): :code:`bwa samse -n ` log_scaled_gap_penalty (in | None): :code:`-L` with_md (bool): output the MD to each alignment in the XA tag, otherwise use :code:`"."` + threads (int): the number of threads to use """ cdef gap_opt_t * _delegate @@ -55,7 +56,8 @@ cdef class BwaAlnOptions: stop_at_max_best_hits: int | None = None, max_hits: int | None = 3, log_scaled_gap_penalty: bool | None = None, - with_md: bool | None = False + with_md: bool | None = False, + threads: int | None = None ): if max_mismatches is not None: self.max_mismatches = max_mismatches @@ -83,6 +85,8 @@ cdef class BwaAlnOptions: self.log_scaled_gap_penalty = 1 if log_scaled_gap_penalty else 0 if with_md is not None: self.with_md = with_md + if threads is not None: + self.threads = threads def __cinit__(self): self._delegate = gap_init_opt() @@ -200,6 +204,14 @@ cdef class BwaAlnOptions: def __set__(self, value: bool): self._with_md = value + property threads: + """:code:`bwa aln -t""" + def __get__(self) -> int: + return self._delegate.n_threads + def __set__(self, value: int): + self._delegate.n_threads = value + + cdef class BwaAln: """The class to align reads with :code:`bwa aln`.""" @@ -397,7 +409,7 @@ cdef class BwaAln: seqs[i].tid = -1 # this is `bwa aln`, and the rest is `bwa samse` - bwa_cal_sa_reg_gap(0, self._index.bwt(), num_seqs, seqs, gap_opt) + bwa_cal_sa_reg_gap_threaded(0, self._index.bwt(), num_seqs, seqs, gap_opt) # create the full alignment for i in range(num_seqs): diff --git a/pybwa/libbwaaln_utils.c b/pybwa/libbwaaln_utils.c index 4f2dd78..79a5535 100644 --- a/pybwa/libbwaaln_utils.c +++ b/pybwa/libbwaaln_utils.c @@ -22,4 +22,31 @@ void bwa_cal_pac_pos_with_bwt(const bntseq_t *bns, int n_seqs, bwa_seq_t *seqs, } p->n_multi = n_multi; } +} + +void bwa_cal_sa_reg_gap_threaded(int tid, bwt_t *const bwt, int n_seqs, bwa_seq_t *seqs, const gap_opt_t *opt) +{ +#ifdef HAVE_PTHREAD + if (opt->n_threads <= 1) { // no multi-threading at all + bwa_cal_sa_reg_gap(0, bwt, n_seqs, seqs, opt); + } else { + pthread_t *tid; + pthread_attr_t attr; + thread_aux_t *data; + int j; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + data = (thread_aux_t*)calloc(opt->n_threads, sizeof(thread_aux_t)); + tid = (pthread_t*)calloc(opt->n_threads, sizeof(pthread_t)); + for (j = 0; j < opt->n_threads; ++j) { + data[j].tid = j; data[j].bwt = bwt; + data[j].n_seqs = n_seqs; data[j].seqs = seqs; data[j].opt = opt; + pthread_create(&tid[j], &attr, worker, data + j); + } + for (j = 0; j < opt->n_threads; ++j) pthread_join(tid[j], 0); + free(data); free(tid); + } +#else + bwa_cal_sa_reg_gap(0, bwt, n_seqs, seqs, opt); +#endif } \ No newline at end of file diff --git a/pybwa/libbwaaln_utils.h b/pybwa/libbwaaln_utils.h index cfd6769..ddb7325 100644 --- a/pybwa/libbwaaln_utils.h +++ b/pybwa/libbwaaln_utils.h @@ -4,5 +4,8 @@ #include "kstring.h" -// Version of "bwa_cal_pac_pos" bwase.c that can use an already loaded forward suffix array (BWT) +// Version of "bwa_cal_pac_pos" bwase.c that can use an already loaded forward suffix array (BWT). void bwa_cal_pac_pos_with_bwt(const bntseq_t *bns, int n_seqs, bwa_seq_t *seqs, int max_mm, float fnr, bwt_t *bwt); + +// Port of running "bwa_cal_sa_reg_gap" in the "bwa_aln_core" method in bwtaln.c so we can support multi-threading. +void bwa_cal_sa_reg_gap_threaded(int tid, bwt_t *const bwt, int n_seqs, bwa_seq_t *seqs, const gap_opt_t *opt); diff --git a/tests/test_bwapy.py b/tests/test_bwapy.py index 7622c2b..8e11971 100644 --- a/tests/test_bwapy.py +++ b/tests/test_bwapy.py @@ -76,6 +76,29 @@ def test_bwaaln(ref_fasta: Path, fastx_record: FastxRecord) -> None: assert rec.cigarstring == "80M" +def test_bwaaln_threading(ref_fasta: Path, fastx_record: FastxRecord) -> None: + opt = BwaAlnOptions(threads=2) + bwa = BwaAln(prefix=ref_fasta) + revcomp_seq = None if not fastx_record.sequence else reverse_complement(fastx_record.sequence) + revcomp_record = FastxRecord(name="revcomp", sequence=revcomp_seq) + + queries = [fastx_record if i % 2 == 0 else revcomp_record for i in range(100)] + recs = bwa.align(opt=opt, queries=queries) + assert len(recs) == len(queries) + for i, rec in enumerate(recs): + if i % 2 == 0: + assert rec.query_name == "test" + assert rec.is_forward + else: + assert rec.query_name == "revcomp" + assert rec.is_reverse + assert not rec.is_paired + assert not rec.is_read1 + assert not rec.is_read2 + assert rec.reference_start == 80 + assert rec.cigarstring == "80M" + + def test_bwamem_options() -> None: # default options options = BwaMemOptions()