From ba267926d2a70ec0389963446f38e7f68fb22e3b Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Tue, 30 Jan 2024 23:09:26 +0000 Subject: [PATCH] Basic column reading --- sgkit/io/vcf/vcf_converter.py | 120 ++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 58 deletions(-) diff --git a/sgkit/io/vcf/vcf_converter.py b/sgkit/io/vcf/vcf_converter.py index d06eddb19..b4a8224a4 100644 --- a/sgkit/io/vcf/vcf_converter.py +++ b/sgkit/io/vcf/vcf_converter.py @@ -50,13 +50,24 @@ def flush_futures(futures): class PickleChunkedColumn: - def __init__(self, directory): - self.directory = directory + def __init__(self, path): + self.path = path self.compressor = numcodecs.Blosc(cname="zstd", clevel=7) + self.num_partitions = None + self._num_chunks = None + + @property + def num_chunks(self): + if self._num_chunks is None: + self._num_chunks = len(list(self.path.iterdir())) + return self._num_chunks + + def __repr__(self): + # TODO add class name + return repr({"path": str(self.path), "num_chunks": self.num_chunks}) def write_chunk(self, partition_index, chunk_index, data): - path = self.directory / f"p{partition_index}" / f"c{chunk_index}" - print("Write chunk", path) + path = self.path / f"p{partition_index}" / f"c{chunk_index}" pkl = pickle.dumps(data) # NOTE assuming that reusing the same compressor instance # from multiple threads is OK! @@ -65,10 +76,15 @@ def write_chunk(self, partition_index, chunk_index, data): f.write(compressed) def read_chunk(self, partition_index, chunk_index): - path = self.directory / f"p{partition_index}" / f"c{chunk_index}" + path = self.path / f"p{partition_index}" / f"c{chunk_index}" with open(path, "rb") as f: pkl = self.compressor.decode(f.read()) - return pickle.loads(data) + return pickle.loads(pkl) + + def iter_values(self): + for partition_index in range(self.num_partitions): + for chunk_index in range(self.num_chunks): + yield from self.read_chunk(partition_index, chunk_index) class PickleChunkedColumnWriteBuffer: @@ -81,7 +97,6 @@ def __init__(self, column, partition_index, executor, futures, chunk_size=1): assert self.max_buffered_bytes > 0 self.partition_index = partition_index self.chunk_index = 0 - self.column.directory.mkdir(exist_ok=True) self.executor = executor self.futures = futures @@ -112,27 +127,45 @@ def __init__(self, path, metadata): self.path = path self.metadata = metadata - @property - def num_partitions(self): - return len(self.metadata.partitions) - - def mkdirs(self): - self.path.mkdir() + self.columns = {} fixed_cols = ["CHROM", "POS", "QUAL", "ID", "FILTERS", "REF", "ALT"] - field_paths = [self.path / col for col in fixed_cols] + for col in fixed_cols: + self.columns[col] = PickleChunkedColumn(self.path / col) info_path = self.path / "INFO" - info_path.mkdir() for field in self.metadata.info_fields: - field_paths.append(info_path / field.name) + self.columns[f"INFO/{field.name}"] = PickleChunkedColumn( + info_path / field.name + ) format_path = self.path / "FORMAT" - format_path.mkdir() for field in self.metadata.format_fields: - field_paths.append(format_path / field.name) + self.columns[f"FORMAT/{field.name}"] = PickleChunkedColumn( + format_path / field.name + ) + + for col in self.columns.values(): + col.num_partitions = self.num_partitions + + @property + def num_partitions(self): + return len(self.metadata.partitions) - for field_path in field_paths: - field_path.mkdir() + def mkdirs(self): + self.path.mkdir() + # fixed_cols = ["CHROM", "POS", "QUAL", "ID", "FILTERS", "REF", "ALT"] + # field_paths = [self.path / col for col in fixed_cols] + # info_path = self.path / "INFO" + # info_path.mkdir() + # for field in self.metadata.info_fields: + # field_paths.append(info_path / field.name) + # format_path = self.path / "FORMAT" + # format_path.mkdir() + # for field in self.metadata.format_fields: + # field_paths.append(format_path / field.name) + + for col in self.columns.values(): + col.path.mkdir(parents=True) for j in range(self.num_partitions): - part_path = field_path / f"p{j}" + part_path = col.path / f"p{j}" part_path.mkdir() @staticmethod @@ -283,35 +316,6 @@ def make_col(col_path): service_futures(0) -# class ColumnarisedVcf: -# def __init__(self, path): -# self.path = path -# with open(path / "spec.json") as f: -# d = json.load(f) -# self.spec = ConversionSpecification.fromdict(d) -# self.num_partitions = len(self.spec.partitions) -# self.num_records = sum(part.num_records for part in self.spec.partitions) - -# def iter_chunks(self, name): -# for j in range(self.num_partitions): -# partition_dir = self.path / f"partition_{j}" / name -# num_chunks = len(list(partition_dir.iterdir())) -# for k in range(num_chunks): -# chunk_file = partition_dir / f"{k}" -# # print("load", chunk_file) -# with open(chunk_file, "rb") as f: -# chunk = pickle.load(f) -# yield chunk -# def values(self, name): -# """ -# Return the full column as a python list. -# """ -# ret = [] -# for chunk in self.iter_chunks(name): -# ret.extend(chunk) -# return ret - - def encode_zarr( columnarised_path, out_path, @@ -320,15 +324,15 @@ def encode_zarr( chunk_length=None, show_progress=False, ): - pcv = PickleChunkedVcf(pathlib.Path(columnarised_path)) + pcv = PickleChunkedVcf.load(pathlib.Path(columnarised_path)) print(pcv) - # cv = ColumnarisedVcf(pathlib.Path(columnarised_path)) - # pos = np.array(cv.values("POS"), dtype=np.int32) - # chrom = np.array(cv.values("CHROM")) - # qual = np.array(cv.values("QUAL")) - # print(pos) - # print(chrom) - # print(qual) + pos = pcv.columns["POS"] + print(pos) + print(np.array(list(pos.iter_values()))) + + gt = pcv.columns["FORMAT/GT"] + print(gt) + print(np.array(list(gt.iter_values()))) def sync_flush_array(np_buffer, zarr_array, offset):