Skip to content

Commit

Permalink
Some initial bencmarking on final encoding perf - looks good!
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromekelleher committed Feb 1, 2024
1 parent 9de0913 commit 45b26df
Showing 1 changed file with 125 additions and 100 deletions.
225 changes: 125 additions & 100 deletions sgkit/io/vcf/vcf_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,25 @@ def generate(pcvcf):
)


@dataclasses.dataclass
class BufferedArray:
array: Any
buff: Any
fill_value: Any

def __init__(self, array, fill_value=None):
self.array = array
dims = list(array.shape)
dims[0] = min(array.chunks[0], array.shape[0])
self.fill_value = fill_value
if fill_value is None:
self.fill_value = _dtype_to_fill[array.dtype.str]
self.buff = np.full(dims, self.fill_value, dtype=array.dtype)

def swap_buffers(self):
self.buff = np.full_like(self.buff, self.fill_value)


class SgvcfZarr:
def __init__(self, path):
self.path = pathlib.Path(path)
Expand All @@ -779,7 +798,6 @@ def full_array(name, data, dimensions, *, dtype=None, chunks=None):
a.attrs["_ARRAY_DIMENSIONS"] = dimensions
return a


self.root.attrs["filters"] = pcvcf.metadata.filters
full_array("filter_id", pcvcf.metadata.filters, ["filters"], dtype="str")
full_array("contig_id", pcvcf.metadata.contig_names, ["configs"], dtype="str")
Expand Down Expand Up @@ -874,115 +892,122 @@ def empty_fixed_field_array(name, dtype, shape=None):
a.attrs["_ARRAY_DIMENSIONS"] = dimensions
# print(a)

def encode_flag_column(self, source_col, array):
print("FLAG", source_col, array)
a = np.zeros_like(array)
# print(a)
for j, val in enumerate(source_col.iter_values()):
if val is not None:
a[j] = True
# print(a)
array[:] = a

def encode_gt_int_column(self, source_col, array, executor):
ba = BufferedArray(array)
chunk_length = array.chunks[0]
num_variants = array.shape[0]
futures = []

chunk_start = 0
j = 0
pbar = tqdm.tqdm(total=num_variants, desc=source_col.vcf_field.full_name)
for index, value in enumerate(source_col.iter_values()):
pbar.update(1)
if value is not None:
ba.buff[j] = value.reshape(ba.buff.shape[1:])
j += 1
if j == chunk_length:
flush_futures(futures)
futures.extend(
async_flush_array(executor, ba.buff, ba.array, chunk_start)
)
ba.swap_buffers()
j = 0
chunk_start += chunk_length
if j != 0:
flush_futures(futures)
futures.extend(
async_flush_array(executor, ba.buff[:j], ba.array, chunk_start)
)
pbar.close()

def encode_column(self, pcvcf, column):
source_col = pcvcf.columns[column.vcf_field]
print(source_col)
array = self.root[column.name]
print(array)
try:
a = np.array(list(source_col.iter_values()))
array[:] = a
print("WORKED")
except Exception as e:
print("error", e)
with cf.ThreadPoolExecutor(max_workers=4) as executor:
self.encode_gt_int_column(source_col, array, executor)

@staticmethod
def convert(pcvcf, path, conversion_spec, show_progress=False):
sgvcf = SgvcfZarr(path)
sgvcf.create_arrays(pcvcf, conversion_spec)

for column in conversion_spec.columns:
if "GT" not in column.name:
sgvcf.encode_column(pcvcf, column)



# def sync_flush_array(np_buffer, zarr_array, offset):
# zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer


# def async_flush_array(executor, np_buffer, zarr_array, offset):
# """
# Flush the specified chunk aligned buffer to the specified zarr array.
# """
# assert zarr_array.shape[1:] == np_buffer.shape[1:]
# # print("sync", zarr_array, np_buffer)

# if len(np_buffer.shape) == 1:
# futures = [executor.submit(sync_flush_array, np_buffer, zarr_array, offset)]
# else:
# futures = async_flush_2d_array(executor, np_buffer, zarr_array, offset)
# return futures


# def async_flush_2d_array(executor, np_buffer, zarr_array, offset):
# # Flush each of the chunks in the second dimension separately
# s = slice(offset, offset + np_buffer.shape[0])

# def flush_chunk(start, stop):
# zarr_array[s, start:stop] = np_buffer[:, start:stop]

# chunk_width = zarr_array.chunks[1]
# zarr_array_width = zarr_array.shape[1]
# start = 0
# futures = []
# while start < zarr_array_width:
# stop = min(start + chunk_width, zarr_array_width)
# future = executor.submit(flush_chunk, start, stop)
# futures.append(future)
# start = stop

# return futures


# _dtype_to_fill = {
# "|b1": False,
# "|i1": INT_FILL,
# "<i2": INT_FILL,
# "<i4": INT_FILL,
# "<f4": FLOAT32_FILL,
# "|O": STR_FILL,
# }


# @dataclasses.dataclass
# class BufferedArray:
# array: Any
# buff: Any
# fill_value: Any

# def __init__(self, array, fill_value=None):
# self.array = array
# dims = list(array.shape)
# dims[0] = min(array.chunks[0], array.shape[0])
# self.fill_value = fill_value
# if fill_value is None:
# self.fill_value = _dtype_to_fill[array.dtype.str]
# self.buff = np.full(dims, self.fill_value, dtype=array.dtype)

# def swap_buffers(self):
# self.buff = np.full_like(self.buff, self.fill_value)


# @dataclasses.dataclass
# class BufferedUnsizedField:
# variable_name: str
# buff: list = dataclasses.field(default_factory=list)

# def swap_buffers(self):
# self.buff = []


# def sync_flush_unsized_buffer(buff, file_path):
# with open(file_path, "wb") as f:
# pickle.dump(buff, f)


# def async_flush_unsized_buffer(executor, buf, zarr_path, partition_index, chunk_index):
# dest_file = (
# zarr_path / "tmp" / buf.variable_name / f"{partition_index}.{chunk_index}"
# )
# return [executor.submit(sync_flush_unsized_buffer, buf.buff, dest_file)]
# TODO change this variable to array_name or something, this is
# getting very confusing.
# print(column.name)
# if column.name == "call_GQ":
# if column.name == "variant_position":

# if "GT" not in column.name:

# FIXME we seem to be calling FORMAT/PID as a String type not integer
# for some reason. Need to dig in. Looks like the GIL starts hitting
# when we have large string columns.
if "AB" not in column.name and "GT" not in column.name:
try:
sgvcf.encode_column(pcvcf, column)
except Exception as e:
print("ERROR", e)
# break


def sync_flush_array(np_buffer, zarr_array, offset):
zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer


def async_flush_array(executor, np_buffer, zarr_array, offset):
"""
Flush the specified chunk aligned buffer to the specified zarr array.
"""
assert zarr_array.shape[1:] == np_buffer.shape[1:]
# print("sync", zarr_array, np_buffer)

if len(np_buffer.shape) == 1:
futures = [executor.submit(sync_flush_array, np_buffer, zarr_array, offset)]
else:
futures = async_flush_2d_array(executor, np_buffer, zarr_array, offset)
return futures


def async_flush_2d_array(executor, np_buffer, zarr_array, offset):
# Flush each of the chunks in the second dimension separately
s = slice(offset, offset + np_buffer.shape[0])

def flush_chunk(start, stop):
zarr_array[s, start:stop] = np_buffer[:, start:stop]

chunk_width = zarr_array.chunks[1]
zarr_array_width = zarr_array.shape[1]
start = 0
futures = []
while start < zarr_array_width:
stop = min(start + chunk_width, zarr_array_width)
future = executor.submit(flush_chunk, start, stop)
futures.append(future)
start = stop

return futures


_dtype_to_fill = {
"|b1": False,
"|i1": INT_FILL,
"<i2": INT_FILL,
"<i4": INT_FILL,
"<f4": FLOAT32_FILL,
"|O": STR_FILL,
}


# def write_partition(
Expand Down

0 comments on commit 45b26df

Please sign in to comment.