From cee4ffd1b9e1053615887759725d01c3f72deed5 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Tue, 30 Jan 2024 12:40:53 +0000 Subject: [PATCH] Add column_chunk_size option --- sgkit/io/vcf/vcf_converter.py | 15 ++++++++++----- vcf2zarr.py | 6 ++++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sgkit/io/vcf/vcf_converter.py b/sgkit/io/vcf/vcf_converter.py index 5e79acebe..c49f40c35 100644 --- a/sgkit/io/vcf/vcf_converter.py +++ b/sgkit/io/vcf/vcf_converter.py @@ -55,11 +55,12 @@ class BufferedList: A list of items that we flush to files of approximately fixed size. """ - def __init__(self, dest_dir, executor, future_to_path, max_buffered_mb=1): + def __init__(self, dest_dir, executor, future_to_path, chunk_size=1): self.dest_dir = dest_dir self.buffer = [] self.buffered_bytes = 0 - self.max_buffered_bytes = max_buffered_mb * 2**20 + # chunk_size is in megabytes + self.max_buffered_bytes = chunk_size * 2**20 assert self.max_buffered_bytes > 0 self.chunk_index = 0 self.dest_dir.mkdir(exist_ok=True) @@ -95,7 +96,7 @@ def flush(self): self.buffered_bytes = 0 -def columnarise_vcf(vcf_path, out_path, *, flush_threads=4, column_buffer_mb=10): +def columnarise_vcf(vcf_path, out_path, *, flush_threads=4, column_chunk_size=16): if out_path.exists(): shutil.rmtree(out_path) @@ -120,7 +121,7 @@ def service_futures(max_waiting=2 * flush_threads): with cf.ThreadPoolExecutor(max_workers=flush_threads) as executor: def make_col(col_path): - return BufferedList(col_path, executor, future_to_path, column_buffer_mb) + return BufferedList(col_path, executor, future_to_path, column_chunk_size) contig = make_col(out_path / "CHROM") pos = make_col(out_path / "POS") @@ -990,6 +991,7 @@ def columnarise( vcfs, out_path, *, + column_chunk_size=16, worker_processes=1, show_progress=False, ): @@ -1021,7 +1023,10 @@ def columnarise( for j, partition in enumerate(spec.partitions): futures.append( executor.submit( - columnarise_vcf, partition.path, out_path / f"partition_{j}" + columnarise_vcf, + partition.path, + out_path / f"partition_{j}", + column_chunk_size=column_chunk_size, ) ) flush_futures(futures) diff --git a/vcf2zarr.py b/vcf2zarr.py index 01f4dfaf3..fcc800e62 100644 --- a/vcf2zarr.py +++ b/vcf2zarr.py @@ -23,8 +23,10 @@ def scan(vcfs): @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) @click.option("-w", "--worker-processes", type=int, default=1) -def columnarise(vcfs, out_path, worker_processes): - cnv.columnarise(vcfs, out_path, worker_processes=worker_processes, show_progress=True) +@click.option("-c", "--column-chunk-size", type=int, default=16) +def columnarise(vcfs, out_path, worker_processes, column_chunk_size): + cnv.columnarise(vcfs, out_path, worker_processes=worker_processes, + column_chunk_size=column_chunk_size,show_progress=True) @click.command @click.argument("columnarised", type=click.Path())