Skip to content

Commit

Permalink
Basic column reading
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromekelleher committed Jan 30, 2024
1 parent 425ad14 commit ba26792
Showing 1 changed file with 62 additions and 58 deletions.
120 changes: 62 additions & 58 deletions sgkit/io/vcf/vcf_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,24 @@ def flush_futures(futures):


class PickleChunkedColumn:
def __init__(self, directory):
self.directory = directory
def __init__(self, path):
self.path = path
self.compressor = numcodecs.Blosc(cname="zstd", clevel=7)
self.num_partitions = None
self._num_chunks = None

@property
def num_chunks(self):
if self._num_chunks is None:
self._num_chunks = len(list(self.path.iterdir()))
return self._num_chunks

def __repr__(self):
# TODO add class name
return repr({"path": str(self.path), "num_chunks": self.num_chunks})

def write_chunk(self, partition_index, chunk_index, data):
path = self.directory / f"p{partition_index}" / f"c{chunk_index}"
print("Write chunk", path)
path = self.path / f"p{partition_index}" / f"c{chunk_index}"
pkl = pickle.dumps(data)
# NOTE assuming that reusing the same compressor instance
# from multiple threads is OK!
Expand All @@ -65,10 +76,15 @@ def write_chunk(self, partition_index, chunk_index, data):
f.write(compressed)

def read_chunk(self, partition_index, chunk_index):
path = self.directory / f"p{partition_index}" / f"c{chunk_index}"
path = self.path / f"p{partition_index}" / f"c{chunk_index}"
with open(path, "rb") as f:
pkl = self.compressor.decode(f.read())
return pickle.loads(data)
return pickle.loads(pkl)

def iter_values(self):
for partition_index in range(self.num_partitions):
for chunk_index in range(self.num_chunks):
yield from self.read_chunk(partition_index, chunk_index)


class PickleChunkedColumnWriteBuffer:
Expand All @@ -81,7 +97,6 @@ def __init__(self, column, partition_index, executor, futures, chunk_size=1):
assert self.max_buffered_bytes > 0
self.partition_index = partition_index
self.chunk_index = 0
self.column.directory.mkdir(exist_ok=True)
self.executor = executor
self.futures = futures

Expand Down Expand Up @@ -112,27 +127,45 @@ def __init__(self, path, metadata):
self.path = path
self.metadata = metadata

@property
def num_partitions(self):
return len(self.metadata.partitions)

def mkdirs(self):
self.path.mkdir()
self.columns = {}
fixed_cols = ["CHROM", "POS", "QUAL", "ID", "FILTERS", "REF", "ALT"]
field_paths = [self.path / col for col in fixed_cols]
for col in fixed_cols:
self.columns[col] = PickleChunkedColumn(self.path / col)
info_path = self.path / "INFO"
info_path.mkdir()
for field in self.metadata.info_fields:
field_paths.append(info_path / field.name)
self.columns[f"INFO/{field.name}"] = PickleChunkedColumn(
info_path / field.name
)
format_path = self.path / "FORMAT"
format_path.mkdir()
for field in self.metadata.format_fields:
field_paths.append(format_path / field.name)
self.columns[f"FORMAT/{field.name}"] = PickleChunkedColumn(
format_path / field.name
)

for col in self.columns.values():
col.num_partitions = self.num_partitions

@property
def num_partitions(self):
return len(self.metadata.partitions)

for field_path in field_paths:
field_path.mkdir()
def mkdirs(self):
self.path.mkdir()
# fixed_cols = ["CHROM", "POS", "QUAL", "ID", "FILTERS", "REF", "ALT"]
# field_paths = [self.path / col for col in fixed_cols]
# info_path = self.path / "INFO"
# info_path.mkdir()
# for field in self.metadata.info_fields:
# field_paths.append(info_path / field.name)
# format_path = self.path / "FORMAT"
# format_path.mkdir()
# for field in self.metadata.format_fields:
# field_paths.append(format_path / field.name)

for col in self.columns.values():
col.path.mkdir(parents=True)
for j in range(self.num_partitions):
part_path = field_path / f"p{j}"
part_path = col.path / f"p{j}"
part_path.mkdir()

@staticmethod
Expand Down Expand Up @@ -283,35 +316,6 @@ def make_col(col_path):
service_futures(0)


# class ColumnarisedVcf:
# def __init__(self, path):
# self.path = path
# with open(path / "spec.json") as f:
# d = json.load(f)
# self.spec = ConversionSpecification.fromdict(d)
# self.num_partitions = len(self.spec.partitions)
# self.num_records = sum(part.num_records for part in self.spec.partitions)

# def iter_chunks(self, name):
# for j in range(self.num_partitions):
# partition_dir = self.path / f"partition_{j}" / name
# num_chunks = len(list(partition_dir.iterdir()))
# for k in range(num_chunks):
# chunk_file = partition_dir / f"{k}"
# # print("load", chunk_file)
# with open(chunk_file, "rb") as f:
# chunk = pickle.load(f)
# yield chunk
# def values(self, name):
# """
# Return the full column as a python list.
# """
# ret = []
# for chunk in self.iter_chunks(name):
# ret.extend(chunk)
# return ret


def encode_zarr(
columnarised_path,
out_path,
Expand All @@ -320,15 +324,15 @@ def encode_zarr(
chunk_length=None,
show_progress=False,
):
pcv = PickleChunkedVcf(pathlib.Path(columnarised_path))
pcv = PickleChunkedVcf.load(pathlib.Path(columnarised_path))
print(pcv)
# cv = ColumnarisedVcf(pathlib.Path(columnarised_path))
# pos = np.array(cv.values("POS"), dtype=np.int32)
# chrom = np.array(cv.values("CHROM"))
# qual = np.array(cv.values("QUAL"))
# print(pos)
# print(chrom)
# print(qual)
pos = pcv.columns["POS"]
print(pos)
print(np.array(list(pos.iter_values())))

gt = pcv.columns["FORMAT/GT"]
print(gt)
print(np.array(list(gt.iter_values())))


def sync_flush_array(np_buffer, zarr_array, offset):
Expand Down

0 comments on commit ba26792

Please sign in to comment.