Skip to content

Commit

Permalink
Add genotypes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromekelleher committed Feb 5, 2024
1 parent 9485893 commit b523498
Showing 1 changed file with 66 additions and 197 deletions.
263 changes: 66 additions & 197 deletions sgkit/io/vcf/vcf_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,15 @@ def fixed_field_spec(name, dtype, vcf_field=None):
dtype="f4",
),
]
# TODO FILTERS and alleles
# empty_fixed_field_array("variant_filter",
# shape=(m, len(vcf_metadata.filters)),
# chunks=(chunk_length),
# dtype=bool,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "filters"]

gt_field = None
for field in pcvcf.metadata.fields:
if field.category == "fixed":
Expand Down Expand Up @@ -951,13 +960,14 @@ def fixed_field_spec(name, dtype, vcf_field=None):
vcf_field=None,
name="call_genotype_phased",
dtype="bool",
shape=shape,
chunks=chunks,
shape=list(shape),
chunks=list(chunks),
dimensions=dimensions,
description="",
compressor=compressor,
)
)
shape += [ploidy]
dimensions += ["ploidy"]
colspecs.append(
ZarrColumnSpec(
Expand Down Expand Up @@ -1014,7 +1024,7 @@ def __init__(self, path):
self.root = None

def create_array(self, variable):
print("CREATE", variable)
# print("CREATE", variable)
a = self.root.empty(
variable.name,
shape=variable.shape,
Expand Down Expand Up @@ -1065,32 +1075,6 @@ def full_array(name, data, dimensions, *, dtype=None, chunks=None):
# dtype=np.int64,
# )

# def empty_fixed_field_array(name, dtype, shape=None):
# a = self.root.empty(
# name,
# shape=(num_variants,),
# dtype=dtype,
# chunks=(spec.chunk_length,),
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]
# return a

# FIXME get dtype from lookup table
# empty_fixed_field_array("variant_contig", np.int16)
# empty_fixed_field_array("variant_position", np.int32)
# empty_fixed_field_array("variant_id", "str")
# empty_fixed_field_array("variant_id_mask", bool)
# empty_fixed_field_array("variant_quality", np.float32)
# TODO FILTER
# empty_fixed_field_array("variant_filter",
# shape=(m, len(vcf_metadata.filters)),
# chunks=(chunk_length),
# dtype=bool,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "filters"]

def encode_column(self, pcvcf, column):
source_col = pcvcf.columns[column.vcf_field]
array = self.root[column.name]
Expand All @@ -1116,9 +1100,7 @@ def encode_column(self, pcvcf, column):
j = 0
chunk_start += chunk_length
if last_bytes_read != bytes_read:
# print("Bytes read", bytes_read, "inc", bytes_read - last_bytes_read)
with progress_counter.get_lock():
# print("progress = ", progress_counter.value)
progress_counter.value += bytes_read - last_bytes_read
last_bytes_read = bytes_read

Expand All @@ -1127,6 +1109,49 @@ def encode_column(self, pcvcf, column):
futures.extend(
async_flush_array(executor, ba.buff[:j], ba.array, chunk_start)
)
flush_futures(futures)

def encode_genotypes(self, pcvcf):
source_col = pcvcf.columns["FORMAT/GT"]
gt = BufferedArray(self.root["call_genotype"])
gt_mask = BufferedArray(self.root["call_genotype_mask"])
gt_phased = BufferedArray(self.root["call_genotype_phased"])
chunk_length = gt.array.chunks[0]

buffered_arrays = [gt, gt_phased, gt_mask]

with cf.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
chunk_start = 0
j = 0
last_bytes_read = 0
for value, bytes_read in source_col.iter_values_bytes():
sanitise_value_int_2d(gt.buff, j, value[:, :-1])
sanitise_value_int_1d(gt_phased.buff, j, value[:, -1])
# TODO set mask from buff[j]

j += 1
if j == chunk_length:
flush_futures(futures)
for ba in buffered_arrays:
futures.extend(
async_flush_array(executor, ba.buff, ba.array, chunk_start)
)
ba.swap_buffers()
j = 0
chunk_start += chunk_length
if last_bytes_read != bytes_read:
with progress_counter.get_lock():
progress_counter.value += bytes_read - last_bytes_read
last_bytes_read = bytes_read

if j != 0:
flush_futures(futures)
for ba in buffered_arrays:
futures.extend(
async_flush_array(executor, ba.buff[:j], ba.array, chunk_start)
)
flush_futures(futures)

@staticmethod
def convert(pcvcf, path, conversion_spec, show_progress=False):
Expand Down Expand Up @@ -1157,17 +1182,23 @@ def convert(pcvcf, path, conversion_spec, show_progress=False):
initargs=(progress_counter,),
) as executor:
futures = []

special_cases = {}
for variable in conversion_spec.variables[:]:
# TODO change this variable to array_name or something, this is
# getting very confusing.
# print(column.name)

# if "GT" in column.name:
# continue
if variable.vcf_field is not None:
print("Encode", variable.name)
# print("Encode", variable.name)
future = executor.submit(sgvcf.encode_column, pcvcf, variable)
futures.append(future)
else:
special_cases[variable.name] = variable

if "call_genotype" in special_cases:
# TODO add mixed ploidy
futures.append(executor.submit(sgvcf.encode_genotypes, pcvcf))

flush_futures(futures)


Expand Down Expand Up @@ -1209,168 +1240,6 @@ def flush_chunk(start, stop):
return futures


# def create_zarr(
# path, vcf_metadata, partitions, *, chunk_length, chunk_width, max_num_alleles
# ):
# sample_id = np.array(vcf_metadata.samples, dtype="O")
# n = sample_id.shape[0]
# m = sum(partition.num_records for partition in partitions)

# store = zarr.DirectoryStore(path)
# root = zarr.group(store=store, overwrite=True)
# compressor = numcodecs.Blosc(
# cname="zstd", clevel=7, shuffle=numcodecs.Blosc.AUTOSHUFFLE
# )

# root.attrs["filters"] = vcf_metadata.filters
# a = root.array(
# "filter_id",
# vcf_metadata.filters,
# dtype="str",
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["filters"]

# a = root.array(
# "sample_id",
# sample_id,
# chunks=(chunk_width,),
# dtype="str",
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["samples"]

# a = root.array(
# "contig_id",
# vcf_metadata.contig_names,
# dtype="str",
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]

# if vcf_metadata.contig_lengths is not None:
# a = root.array(
# "contig_length",
# vcf_metadata.contig_lengths,
# dtype=np.int64,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]

# a = root.empty(
# "variant_contig",
# shape=(m),
# chunks=(chunk_length),
# dtype=smallest_numpy_int_dtype(len(vcf_metadata.contig_names)),
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]

# a = root.empty(
# "variant_position",
# shape=(m),
# chunks=(chunk_length),
# dtype=np.int32,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]

# a = root.empty(
# "variant_id",
# shape=(m),
# chunks=(chunk_length),
# dtype="str",
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]

# a = root.empty(
# "variant_id_mask",
# shape=(m),
# chunks=(chunk_length),
# dtype=bool,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]

# a = root.empty(
# "variant_quality",
# shape=(m),
# chunks=(chunk_length),
# dtype=np.float32,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]

# a = root.empty(
# "variant_filter",
# shape=(m, len(vcf_metadata.filters)),
# chunks=(chunk_length),
# dtype=bool,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "filters"]

# a = root.empty(
# "call_genotype",
# shape=(m, n, 2),
# chunks=(chunk_length, chunk_width),
# dtype=smallest_numpy_int_dtype(max_num_alleles),
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "samples", "ploidy"]

# a = root.empty(
# "call_genotype_mask",
# shape=(m, n, 2),
# chunks=(chunk_length, chunk_width),
# dtype=bool,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "samples", "ploidy"]

# a = root.empty(
# "call_genotype_phased",
# shape=(m, n),
# chunks=(chunk_length, chunk_width),
# dtype=bool,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "samples"]

# # Create temporary storage for unsized fields
# tmp_dir = path / "tmp"
# tmp_dir.mkdir()

# # print(field)
# field_dir = tmp_dir / "variant_allele"
# field_dir.mkdir()

# for field in vcf_metadata.fields:
# if field.is_fixed_size:
# shape = [m]
# chunks = [chunk_length]
# dimensions = ["variants"]
# if field.category == "FORMAT":
# shape.append(n)
# chunks.append(chunk_width)
# dimensions.append("samples")
# if field.dimension > 1:
# shape.append(field.dimension)
# dimensions.append(field.name)
# a = root.empty(
# field.variable_name,
# shape=shape,
# chunks=chunks,
# dtype=field.dtype,
# compressor=compressor,
# )
# a.attrs["_ARRAY_DIMENSIONS"] = dimensions
# else:
# field_dir = tmp_dir / field.variable_name
# field_dir.mkdir()


# def finalise_zarr(path, spec, chunk_length, chunk_width):
# store = zarr.DirectoryStore(path)
# root = zarr.group(store=store, overwrite=False)
Expand Down

0 comments on commit b523498

Please sign in to comment.