From 45b26df8fbea94ee22e6c89e0e7f8ab11cc9f2fc Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Thu, 1 Feb 2024 23:05:20 +0000 Subject: [PATCH] Some initial bencmarking on final encoding perf - looks good! --- sgkit/io/vcf/vcf_converter.py | 225 +++++++++++++++++++--------------- 1 file changed, 125 insertions(+), 100 deletions(-) diff --git a/sgkit/io/vcf/vcf_converter.py b/sgkit/io/vcf/vcf_converter.py index 585f63b42..fcbdbd7b5 100644 --- a/sgkit/io/vcf/vcf_converter.py +++ b/sgkit/io/vcf/vcf_converter.py @@ -753,6 +753,25 @@ def generate(pcvcf): ) +@dataclasses.dataclass +class BufferedArray: + array: Any + buff: Any + fill_value: Any + + def __init__(self, array, fill_value=None): + self.array = array + dims = list(array.shape) + dims[0] = min(array.chunks[0], array.shape[0]) + self.fill_value = fill_value + if fill_value is None: + self.fill_value = _dtype_to_fill[array.dtype.str] + self.buff = np.full(dims, self.fill_value, dtype=array.dtype) + + def swap_buffers(self): + self.buff = np.full_like(self.buff, self.fill_value) + + class SgvcfZarr: def __init__(self, path): self.path = pathlib.Path(path) @@ -779,7 +798,6 @@ def full_array(name, data, dimensions, *, dtype=None, chunks=None): a.attrs["_ARRAY_DIMENSIONS"] = dimensions return a - self.root.attrs["filters"] = pcvcf.metadata.filters full_array("filter_id", pcvcf.metadata.filters, ["filters"], dtype="str") full_array("contig_id", pcvcf.metadata.contig_names, ["configs"], dtype="str") @@ -874,17 +892,50 @@ def empty_fixed_field_array(name, dtype, shape=None): a.attrs["_ARRAY_DIMENSIONS"] = dimensions # print(a) + def encode_flag_column(self, source_col, array): + print("FLAG", source_col, array) + a = np.zeros_like(array) + # print(a) + for j, val in enumerate(source_col.iter_values()): + if val is not None: + a[j] = True + # print(a) + array[:] = a + + def encode_gt_int_column(self, source_col, array, executor): + ba = BufferedArray(array) + chunk_length = array.chunks[0] + num_variants = array.shape[0] + futures = [] + + chunk_start = 0 + j = 0 + pbar = tqdm.tqdm(total=num_variants, desc=source_col.vcf_field.full_name) + for index, value in enumerate(source_col.iter_values()): + pbar.update(1) + if value is not None: + ba.buff[j] = value.reshape(ba.buff.shape[1:]) + j += 1 + if j == chunk_length: + flush_futures(futures) + futures.extend( + async_flush_array(executor, ba.buff, ba.array, chunk_start) + ) + ba.swap_buffers() + j = 0 + chunk_start += chunk_length + if j != 0: + flush_futures(futures) + futures.extend( + async_flush_array(executor, ba.buff[:j], ba.array, chunk_start) + ) + pbar.close() + def encode_column(self, pcvcf, column): source_col = pcvcf.columns[column.vcf_field] - print(source_col) array = self.root[column.name] - print(array) - try: - a = np.array(list(source_col.iter_values())) - array[:] = a - print("WORKED") - except Exception as e: - print("error", e) + with cf.ThreadPoolExecutor(max_workers=4) as executor: + self.encode_gt_int_column(source_col, array, executor) @staticmethod def convert(pcvcf, path, conversion_spec, show_progress=False): @@ -892,97 +943,71 @@ def convert(pcvcf, path, conversion_spec, show_progress=False): sgvcf.create_arrays(pcvcf, conversion_spec) for column in conversion_spec.columns: - if "GT" not in column.name: - sgvcf.encode_column(pcvcf, column) - - - -# def sync_flush_array(np_buffer, zarr_array, offset): -# zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer - - -# def async_flush_array(executor, np_buffer, zarr_array, offset): -# """ -# Flush the specified chunk aligned buffer to the specified zarr array. -# """ -# assert zarr_array.shape[1:] == np_buffer.shape[1:] -# # print("sync", zarr_array, np_buffer) - -# if len(np_buffer.shape) == 1: -# futures = [executor.submit(sync_flush_array, np_buffer, zarr_array, offset)] -# else: -# futures = async_flush_2d_array(executor, np_buffer, zarr_array, offset) -# return futures - - -# def async_flush_2d_array(executor, np_buffer, zarr_array, offset): -# # Flush each of the chunks in the second dimension separately -# s = slice(offset, offset + np_buffer.shape[0]) - -# def flush_chunk(start, stop): -# zarr_array[s, start:stop] = np_buffer[:, start:stop] - -# chunk_width = zarr_array.chunks[1] -# zarr_array_width = zarr_array.shape[1] -# start = 0 -# futures = [] -# while start < zarr_array_width: -# stop = min(start + chunk_width, zarr_array_width) -# future = executor.submit(flush_chunk, start, stop) -# futures.append(future) -# start = stop - -# return futures - - -# _dtype_to_fill = { -# "|b1": False, -# "|i1": INT_FILL, -# "