From 5c4a2b8664922ff0534892827e61549f72a4ac26 Mon Sep 17 00:00:00 2001 From: Matt Stone Date: Mon, 16 Dec 2024 13:36:03 -0500 Subject: [PATCH] feat: Codebase Improvements - Modularization and Maintainability While refactoring `compute_summary_stats()`, we found it would be helpful if the parent function tracked a list of output strings, so each helper function can return a single string (or `str | None`) feat: add PixyArgs dataclass to hold pixy args (#40) This PR adds a dataclass, `PixyArgs` to store arguments given to `pixy` from the user at the command line. Addresses the first checkbox on #36. Highlights: * Any arguments that were listed as `optional` in `__main__.py.main()` are represented by a `Union[T, None]`. * There are 2 string-based arguments here that I used `Enum` for: the `Stats` (one or more of: `pi`, `dxy`, and `fst`) and `FSTEstimator` (one of either `wc` or `hudson`). * The `--bypass-invariant-check` that expects a "yes" or "no" value was converted into a `bool`. No additional tests are added in this PR -- this dataclass is only being added here, not used. The next PR will start replacing current functionality and thus, tests will be included in that PR. refactor: Extract `precompute_filtered_variant_array` (#43) This is the first of several planned PRs to refactor the monolithic `compute_summary_stats()` function. It extracts a helper function to precompute the filtered genotype and position arrays. refactor: extract compute_summary_pi (#50) This PR extracts the computation of the summary `pi` statistics. It also introduces a new dataclass, `PiResult`, to capture the results of the `calc_pi` function. To reduce the overhead associated with refactoring, I propose using `Union[T, Literal["NA"]]` in most situations where we'd use `Union[T, None]` refactor: extract compute_summary_dxy (#51) Companion to #50 , this PR extracts `compute_summary_dxy()` feat: `PixyTempResult` (#49) Closes #42. This PR adds `PixyTempResult`, a dataclass that stores output from `pixy` and helps write out results to a tab-delimited file. One test for the `__str__()` method is added. feat: use `PixyTempResult` (#52) Uses the `PixyTempResult` object introduced in #49. **Only used with reworked `pi` and `dxy`-based functions (not `fst`, which is pending additional updates). We can hold this PR in draft form until we finalize the fst functions. refactor: extract `validate_populations_path()` (#59) This PR extracts out the checks related to a user-specific populations_path. I wrote the docstring to reflect what the function does now with the intention of future refactoring (i.e., I don't want to raise a base Exception or use `print` forever). No underlying code is changed as a result of this PR -- it's just code movement. Existing tests cover these changes, albeit indirectly (another item that will be fixed in a future refactoring). refactor: extract `validate_bed_path()` (#54) This PR moves BED file-related validation and code out of `check_and_validate_args` in `core.py` to a new function, `validate_bed_path` in `args_validation.py`. No underlying code is changed, only moved. A future PR will add unit-tests, additional error handling, and other code changes. refactor: extract `validate_sites_path()` (#56) This PR moves sites file-related validation and code out of `check_and_validate_args()` in `core.py` to a new function, `validate_sites_path` in args_validation.py. No underlying code is changed, only moved. A future PR will add unit-tests, additional error handling, and other code changes. There is technically testing coverage here but it's a little indirect -- in `test_main.py` we have a test, `test_malformed_sites_file`, that fails the assertions that were previously in `check_and_validate_args()`. In a future PR we could refactor `run_pixy_helper` to instead be `validate_sites_path`, happy to make that change now or in the future. refactor: extract `validate_vcf_path()` (#58) This PR adds `validate_vcf_path()` to `args_validation.py` and moves code from `core.py` into that function. No underlying code is changed, this is just code movement. refactor: extract out `validate_output_path()` (#60) This PR extracts functionality related to the output_path (output_folder, output_prefix, and temp_file) . As with the other similar PRs, I wrote the docstring to reflect what the function does now with the intention of future refactoring. No underlying code is changed as a result of this PR -- it's just code movement. Existing tests cover these changes, albeit indirectly (another item that will be fixed in a future refactoring). refactor: extract window/interval validation (#64) Refactored during sync refactor: Extract `compute_summary_fst()` (#55) Companion to #50 and #51 refactor: move and clean up `check_and_validate_args()` (#69) This PR moves `check_and_validate_args()` out of `core.py` and into `args_validation.py`. Next, it updates `check_and_validate_args()` to return an instance of `PixyArgs`. `PixyArgs` is then used throughout `__main__.py` instead of the large tuple that was previously returned. Additionally, this PR adds some error handling on the `PixyArgs` class. While refactoring that, I updated the tests to make sure that we were catching bad values passed to `run_pixy_helper`. I added one more unit-test about multiple chromosomes. @msto this PR grew bigger than I planned for, so please let me know if you would prefer multiple, smaller PRs. --------- Co-authored-by: Matt Stone Co-authored-by: Erin McAuley --- .gitignore | 1 + pixy/__main__.py | 227 ++++----- pixy/args_validation.py | 676 ++++++++++++++++++++++++++ pixy/calc.py | 75 +-- pixy/core.py | 958 ++++--------------------------------- pixy/enums.py | 40 ++ pixy/models.py | 125 +++++ pixy/stats/__init__.py | 0 pixy/stats/summary.py | 487 +++++++++++++++++++ pyproject.toml | 1 + tests/conftest.py | 23 +- tests/main/test_main.py | 112 +++-- tests/stats/test_models.py | 114 +++++ 13 files changed, 1793 insertions(+), 1046 deletions(-) create mode 100644 pixy/args_validation.py create mode 100644 pixy/enums.py create mode 100644 pixy/models.py create mode 100644 pixy/stats/__init__.py create mode 100644 pixy/stats/summary.py create mode 100644 tests/stats/test_models.py diff --git a/.gitignore b/.gitignore index a8ddd7a..ad4803c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ output/* docs/_build __pycache__/ +.coverage diff --git a/pixy/__main__.py b/pixy/__main__.py index 9c3dc8d..e5917d1 100644 --- a/pixy/__main__.py +++ b/pixy/__main__.py @@ -1,11 +1,19 @@ #!/usr/bin/env python # coding: utf-8 import typing +import argparse +import logging +import os +import re +import subprocess +import sys +import time from multiprocessing.context import BaseContext from multiprocessing import Manager from multiprocessing.managers import BaseManager from multiprocessing.pool import ApplyResult from typing import List +from typing import Optional import numpy as np import sys @@ -24,6 +32,9 @@ import pixy.core import pixy.calc +from pixy.args_validation import PixyArgs +from pixy.enums import FSTEstimator +from pixy.enums import PixyStat # main pixy function @@ -37,7 +48,17 @@ def main() -> None: help_text = "pixy: unbiased estimates of pi, dxy, and fst from VCFs with invariant sites" version = "1.2.10.beta2" - citation = "Korunes, KL and K Samuk. pixy: Unbiased estimation of nucleotide diversity and divergence in the presence of missing data. Mol Ecol Resour. 2021 Jan 16. doi: 10.1111/1755-0998.13326." + citation = ( + "Korunes, KL and K Samuk. " + "pixy: Unbiased estimation of nucleotide diversity and divergence in the presence of " + "missing data. " + "Mol Ecol Resour. 2021 Jan 16. doi: 10.1111/1755-0998.13326." + ) + # initialize logger + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s:%(funcName)s:%(lineno)s [%(levelname)s]: %(message)s", + ) # initialize arguments parser = argparse.ArgumentParser( @@ -195,41 +216,38 @@ def main() -> None: # returns parsed populaion, chromosome, and sample info print("[pixy] pixy " + version) print("[pixy] See documentation at https://pixy.readthedocs.io/en/latest/") - ( - popnames, - popindices, - chrom_list, - IDs, - temp_file, - output_folder, - output_prefix, - bed_df, - sites_df, - ) = pixy.core.check_and_validate_args(args) + + pixy_args: PixyArgs = pixy.args_validation.check_and_validate_args(args) + popindices = {} + for name in pixy_args.pop_names: + popindices[name] = pixy_args.populations_df[ + pixy_args.populations_df.Population == name + ].callset_index.values + chrom_list = pixy_args.chromosomes print( "\n[pixy] Preparing for calculation of summary statistics: " - + ", ".join(map(str, args.stats)) + + ", ".join(map(str, pixy_args.stats)) ) - if "fst" in args.stats: - if args.fst_type == "wc": + if PixyStat.FST in pixy_args.stats: + if pixy_args.fst_type is FSTEstimator.WC: fst_cite = "Weir and Cockerham (1984)" - elif args.fst_type == "hudson": + elif pixy_args.fst_type is FSTEstimator.HUDSON: fst_cite = "Hudson (1992)" print("[pixy] Using " + fst_cite + "'s estimator of FST.") print( "[pixy] Data set contains " - + str(len(popnames)) + + str(len(pixy_args.pop_names)) + " population(s), " + str(len(chrom_list)) + " chromosome(s), and " - + str(len(IDs)) + + str(len(pixy_args.pop_ids)) + " sample(s)" ) - if args.window_size is not None: + if pixy_args.window_size is not None: print("[pixy] Window size: " + str(args.window_size) + " bp") if args.bed_file is not None: @@ -248,13 +266,13 @@ def main() -> None: ) print( "[pixy] Using " - + str(args.n_cores) + + str(pixy_args.num_cores) + " out of " + str(mp.cpu_count()) + " available CPU cores\n" ) # if in mc mode, set up multiprocessing - if args.n_cores > 1: + if pixy_args.num_cores > 1: # use forking context on linux, and spawn otherwise (macOS) if sys.platform == "linux": ctx: BaseContext = mp.get_context("fork") @@ -290,25 +308,29 @@ def listener(q: Queue, temp_file: str) -> None: listener, args=( q, - temp_file, + str(pixy_args.temp_file), ), ) # begin processing each chromosome - for chromosome in chrom_list: + for chromosome in pixy_args.chromosomes: print("[pixy] Processing chromosome/contig " + chromosome + "...") # if not using a bed file, build windows manually - if args.bed_file is None: - # if an interval is specified, assign it - if args.interval_start is not None: - interval_start: int = int(args.interval_start) - interval_end: int = int(args.interval_end) - + if pixy_args.bed_df is None: + if pixy_args.window_size is not None: + window_size: int = int(pixy_args.window_size) + if pixy_args.has_interval: # if an interval is specified, assign it + # mypy cannot infer that by this point, + # `interval_start` and `interval_end` are not `None`; we assert they are not `None` + assert pixy_args.interval_start is not None + assert pixy_args.interval_end is not None + interval_start: int = int(pixy_args.interval_start) + interval_end: int = int(pixy_args.interval_end) # otherwise, get the interval from the VCF's POS column else: - if args.sites_file is None: + if pixy_args.sites_df is None: chrom_max: List[str] = ( subprocess.check_output( "tabix " + args.vcf + " " + chromosome + " | cut -f 2 | tail -n 1", @@ -320,42 +342,29 @@ def listener(q: Queue, temp_file: str) -> None: interval_start = 1 interval_end = int(chrom_max[0]) else: - sites_df = pandas.read_csv( - args.sites_file, - sep="\t", - usecols=[0, 1], - names=["CHROM", "POS"], - ) - sites_df["CHROM"] = sites_df["CHROM"].astype(str) - sites_pre_list = sites_df[sites_df["CHROM"] == chromosome] + sites_pre_list = pixy_args.sites_df[pixy_args.sites_df["CHROM"] == chromosome] sites_pre_list = sorted(sites_pre_list["POS"].tolist()) interval_start = min(sites_pre_list) interval_end = max(sites_pre_list) # final check if intervals are valid - try: - if interval_start > interval_end: - raise ValueError() - except ValueError as e: - raise Exception( - "[pixy] ERROR: The specified interval start (" - + str(interval_start) - + ") exceeds the interval end (" - + str(interval_end) - + ")" - ) from e + if interval_start > interval_end: + raise ValueError( + f"The specified interval start {interval_start} exceeds the specified " + f"interval end {interval_end}" + ) targ_region = chromosome + ":" + str(interval_start) + "-" + str(interval_end) print("[pixy] Calculating statistics for region " + targ_region + "...") # Determine list of windows over which to compute stats - - # window size - window_size = args.window_size - - # in the case were window size = 1, AND there is a sites file, use the sites file as the 'windows' - if window_size == 1 and args.sites_file is not None: + # in the case were window size = 1, AND there is a sites file, use the sites file as the + # 'windows' + if ( + pixy_args.sites_df is not None and window_size == 1 + ): # TODO: dig into why `window_size` might be unbound + # reference https://github.com/fulcrumgenomics/pixy-dev/issues/70 window_list = [list(a) for a in zip(sites_pre_list, sites_pre_list)] else: # if the interval is smaller than one window, make a list of length 1 @@ -380,9 +389,12 @@ def listener(q: Queue, temp_file: str) -> None: window_list = [list(a) for a in zip(window_pos_1_list, window_pos_2_list)] - # Set aggregate to true if 1) the window size is larger than the chunk size OR 2) the window size wasn't specified, but the chrom is longer than the cutoff - if (window_size > args.chunk_size) or ( - (args.window_size is None) and ((interval_end - interval_start) > args.chunk_size) + # Set aggregate to true if + # 1) the window size is larger than the chunk size OR + # 2) the window size wasn't specified, but the chrom is longer than the cutoff + if (window_size > pixy_args.chunk_size) or ( + (pixy_args.window_size is None) + and ((interval_end - interval_start) > pixy_args.chunk_size) ): aggregate = True else: @@ -391,7 +403,7 @@ def listener(q: Queue, temp_file: str) -> None: # if using a bed file, subset the bed file for the current chromosome else: aggregate = False - bed_df_chrom = bed_df[bed_df["chrom"] == chromosome] + bed_df_chrom = pixy_args.bed_df.loc[pixy_args.bed_df["chrom"] == chromosome] window_list = [ list(a) for a in zip(bed_df_chrom["chromStart"], bed_df_chrom["chromEnd"]) ] @@ -403,30 +415,24 @@ def listener(q: Queue, temp_file: str) -> None: # if aggregating, break down large windows into smaller windows if aggregate: - window_list = pixy.core.assign_subwindows_to_windows(window_list, args.chunk_size) + window_list = pixy.core.assign_subwindows_to_windows(window_list, pixy_args.chunk_size) # using chunk_size, assign windows to chunks - window_list = pixy.core.assign_windows_to_chunks(window_list, args.chunk_size) + window_list = pixy.core.assign_windows_to_chunks(window_list, pixy_args.chunk_size) # if using a sites file, assign sites to chunks, as with windows above - if args.sites_file is not None: - sites_df = pandas.read_csv( - args.sites_file, sep="\t", usecols=[0, 1], names=["CHROM", "POS"] - ) - sites_df["CHROM"] = sites_df["CHROM"].astype(str) - sites_pre_list = sites_df[sites_df["CHROM"] == chromosome] + if pixy_args.sites_df is not None: + sites_pre_list = pixy_args.sites_df[pixy_args.sites_df["CHROM"] == chromosome] sites_pre_list = sites_pre_list["POS"].tolist() - - sites_list = pixy.core.assign_sites_to_chunks(sites_pre_list, args.chunk_size) + sites_list = pixy.core.assign_sites_to_chunks(sites_pre_list, pixy_args.chunk_size) else: - sites_df = None - + sites_list = None # obtain the list of chunks from the window list chunk_list = [i[2] for i in window_list] chunk_list = list(set(chunk_list)) # if running in mc mode, send the summary stats function to the jobs pool - if args.n_cores > 1: + if pixy_args.num_cores > 1: # the list of jobs to be launched jobs = [] @@ -435,11 +441,12 @@ def listener(q: Queue, temp_file: str) -> None: window_list_chunk = [x for x in window_list if x[2] == chunk] # and for the site list (if it exists) - if sites_df is not None: - sites_list_chunk = [x for x in sites_list if x[1] == chunk] - sites_list_chunk = [x[0] for x in sites_list_chunk] - else: + sites_list_chunk: Optional[List[int]] + if sites_list is None: sites_list_chunk = None + else: + chunk_sites_lists: List[List[int]] = [x for x in sites_list if x[1] == chunk] + sites_list_chunk = [x[0] for x in chunk_sites_lists] # determine the bounds of the chunk chunk_pos_1 = min(window_list_chunk, key=lambda x: x[1])[0] @@ -450,9 +457,9 @@ def listener(q: Queue, temp_file: str) -> None: pixy.core.compute_summary_stats, args=( args, - popnames, + pixy_args.pop_names, popindices, - temp_file, + pixy_args.temp_file, chromosome, chunk_pos_1, chunk_pos_2, @@ -470,15 +477,15 @@ def listener(q: Queue, temp_file: str) -> None: job.get() # if running in single core mode, loop over the function manually - elif args.n_cores == 1: + elif pixy_args.num_cores == 1: for chunk in chunk_list: # create a subset of the window list specific to this chunk window_list_chunk = [x for x in window_list if x[2] == chunk] # and for the site list (if it exists) - if sites_df is not None: - sites_list_chunk = [x for x in sites_list if x[1] == chunk] - sites_list_chunk = [x[0] for x in sites_list_chunk] + if pixy_args.sites_df is not None and sites_list is not None: + chunk_sites_lists = [x for x in sites_list if x[1] == chunk] + sites_list_chunk = [x[0] for x in chunk_sites_lists] else: sites_list_chunk = None @@ -492,9 +499,9 @@ def listener(q: Queue, temp_file: str) -> None: # compute summary stats for all windows in the chunk window list pixy.core.compute_summary_stats( args, - popnames, + pixy_args.pop_names, popindices, - temp_file, + pixy_args.temp_file, chromosome, chunk_pos_1, chunk_pos_2, @@ -506,7 +513,7 @@ def listener(q: Queue, temp_file: str) -> None: ) # clean up any remaining jobs and stop the listener - if args.n_cores > 1: + if pixy_args.num_cores > 1: q.put("kill") pool.close() pool.join() @@ -516,35 +523,37 @@ def listener(q: Queue, temp_file: str) -> None: # check if there is any output to process # halt execution if not try: - outpanel: pandas.DataFrame = pandas.read_csv(temp_file, sep="\t", header=None) - except pandas.errors.EmptyDataError: + outpanel: pandas.DataFrame = pandas.read_csv(pixy_args.temp_file, sep="\t", header=None) + except pandas.errors.EmptyDataError as e: raise Exception( "[pixy] ERROR: pixy failed to write any output. Confirm that your bed/sites files and intervals refer to existing chromosomes and positions in the VCF." ) # check if particular stats failed to generate output - successful_stats = np.unique(outpanel[0]) - # if not all requested stats were generated, produce a warning # and then remove the failed stats from the args list - if set(args.stats) != set(successful_stats): - missing_stats = list(set(args.stats) - set(successful_stats)) + # TODO: add a unit-test for coverage here after closing + # https://github.com/fulcrumgenomics/pixy-dev/issues/12 + successful_stats = [PixyStat(stat) for stat in np.unique(outpanel[0])] + + if set(pixy_args.stats) != set(successful_stats): + missing_stats = list(set(pixy_args.stats) - set(successful_stats)) print( - "\n[pixy] WARNING: pixy failed to find any valid gentoype data to calculate the following summary statistics: " - + ", ".join(missing_stats) + "\n[pixy] WARNING: pixy failed to find any valid gentoype data to calculate the " + "following summary statistics: " + + ", ".join([str(stat) for stat in missing_stats]) + ". No output file will be created for these statistics." ) - args.stats = set(successful_stats) outpanel[3] = outpanel[3].astype(str) # force chromosome IDs to string outgrouped = outpanel.groupby([0, 3]) # groupby statistic, chromosome # enforce chromosome IDs as strings - chrom_list = list(map(str, chrom_list)) + chrom_list = list(map(str, pixy_args.chromosomes)) stat: str - if "pi" in args.stats: + if PixyStat.PI in successful_stats: stat = "pi" - pi_file: str = str(output_prefix) + "_pi.txt" + pi_file: str = pixy_args.output_prefix + "_pi.txt" if os.path.exists(pi_file): os.remove(pi_file) @@ -580,7 +589,7 @@ def listener(q: Queue, temp_file: str) -> None: [0, 2], axis=1, inplace=True ) # get rid of "pi" and placeholder (NA) columns outsorted = pixy.core.aggregate_output( - outpi, stat, chromosome, window_size, args.fst_type + outpi, stat, chromosome, window_size, pixy_args.fst_type.value ) outsorted.to_csv( outfile, sep="\t", mode="a", header=False, index=False, na_rep="NA" @@ -604,9 +613,9 @@ def listener(q: Queue, temp_file: str) -> None: outfile.close() - if "dxy" in args.stats: + if PixyStat.DXY in successful_stats: stat = "dxy" - dxy_file: str = str(output_prefix) + "_dxy.txt" + dxy_file: str = pixy_args.output_prefix + "_dxy.txt" if os.path.exists(dxy_file): os.remove(dxy_file) @@ -642,7 +651,7 @@ def listener(q: Queue, temp_file: str) -> None: ) # get this statistic, this chrom only outdxy.drop([0], axis=1, inplace=True) # get rid of "dxy" outsorted = pixy.core.aggregate_output( - outdxy, stat, chromosome, window_size, args.fst_type + outdxy, stat, chromosome, window_size, pixy_args.fst_type.value ) outsorted.to_csv( outfile, sep="\t", mode="a", header=False, index=False, na_rep="NA" @@ -664,9 +673,9 @@ def listener(q: Queue, temp_file: str) -> None: outfile.close() - if "fst" in args.stats: + if PixyStat.FST in successful_stats: stat = "fst" - fst_file: str = str(output_prefix) + "_fst.txt" + fst_file: str = pixy_args.output_prefix + "_fst.txt" if os.path.exists(fst_file): os.remove(fst_file) @@ -713,7 +722,7 @@ def listener(q: Queue, temp_file: str) -> None: if chromosome_has_data: outfst.drop([0], axis=1, inplace=True) # get rid of "fst" outsorted = pixy.core.aggregate_output( - outfst, stat, chromosome, window_size, args.fst_type + outfst, stat, chromosome, window_size, pixy_args.fst_type.value ) outsorted = outsorted.iloc[:, :-3] # drop components (for now) outsorted.to_csv( @@ -767,11 +776,13 @@ def listener(q: Queue, temp_file: str) -> None: # remove the temp file(s) if args.keep_temp_file is not True: - os.remove(temp_file) + os.remove(pixy_args.temp_file) # confirm output was generated successfully outfolder_files = [ - f for f in os.listdir(output_folder) if os.path.isfile(os.path.join(output_folder, f)) + f + for f in os.listdir(pixy_args.output_dir) + if os.path.isfile(os.path.join(pixy_args.output_dir, f)) ] r = re.compile(".*_dxy.*|.*_pi.*|.*_fst.*") @@ -793,10 +804,10 @@ def listener(q: Queue, temp_file: str) -> None: ) total_time = time.time() - start_time print("[pixy] Time elapsed: " + time.strftime("%H:%M:%S", time.gmtime(total_time))) - print("[pixy] Output files written to: " + output_folder) + print("[pixy] Output files written to: " + str(pixy_args.output_dir)) if len(leftover_tmp_files) > 0: - print("\n[pixy] NOTE: There are pixy temp files in " + output_folder) + print("\n[pixy] NOTE: There are pixy temp files in " + str(pixy_args.output_dir)) print( "[pixy] If these are not actively being used (e.g. by another running pixy process), they can be safely deleted." ) diff --git a/pixy/args_validation.py b/pixy/args_validation.py new file mode 100644 index 0000000..047772e --- /dev/null +++ b/pixy/args_validation.py @@ -0,0 +1,676 @@ +import argparse +import logging +import os +import shutil +import subprocess +import uuid +from dataclasses import dataclass +from functools import cached_property +from pathlib import Path +from typing import List +from typing import Tuple +from typing import Union + +import allel +import multiprocess as mp +import numpy as np +import pandas +from numpy.typing import NDArray + +from pixy.enums import FSTEstimator +from pixy.enums import PixyStat + + +@dataclass(frozen=True) +class PixyArgs: + """ + Holds settings for running `pixy`. + + `pixy` has the following mutually exclusive, valid groups of inputs: + * a `vcf_path`, a `populations_path`, and a `window_size` + * a `vcf_path`, a `populations_path`, and a `bed_path` + * a `vcf_path`, a `populations_path`, a `window_size`, `interval_start`, `interval_end`, and + a single value for `chromosomes` + + Note that if a BED path is specified, the user is prohibited from specifying a window size or + interval. If no BED path is specified, the user is required to specify a window size, and the + interval is optional. When specifying an interval, both the start and end are required. + + Attributes: + stats: list of statistics for `pixy` to calculate from the input VCF + vcf_path: path to the input VCF (bgzipped and indexed) + populations_df: a pandas DataFrame derived from a user-specified path to a headerless + tab-separated populations file + num_cores: number of CPUs to utilize for parallel processing (default = 1) + bypass_invariant_check: whether to allow computation of stats without invariant sites + (this option is never recommended and defaults to False) + bed_df: a pandas DataFrame derived from a user-specified path to a headerless BED file. + Empty DataFrame if a path is not given. + output_dir: an optional path to which outputs will be written; default is current directory + output_prefix: an optional prefix with which to prepend to `pixy` output (default is `pixy`) + chromosomes: an optional comma-separated list of chroms over which stats will be calculated + (defaults to all chromosomes in a given VCF) + window_size: an optional length of base pairs over which to calculate stats + interval_start: an optional 1-based position demarcating the start of an interval over which + to calculate stats (only valid when calculating over a single chromosome) + interval_end: an optional 1-based position demarcating the end of an interval over which + to calculate stats (only valid when calculating over a single chromosome) + sites_df: a pandas DataFrame derived from a user-specified path to a headerless + tab-separated file that defines specific sites for which summary stats should be + calculated. Empty DataFrame if a path is not given. + chunk_size: the approximate number of sites to read from VCF at any given time + fst_type: the FST estimator to use, one of either 'WC' (Weir and Cockerham 1984) or + 'HUDSON' (Hudson 1992, Bhatia et al. 2013). Defaults to 'WC'. + temp_file: a Path to which to write intermediate `pixy` results, assigned based on the value + of `output_dir` + + Raises: + ValueError: if an interval is specified without both start and end positions + ValueError: if a BED file is specified, and a window size or interval are also specified + ValueError: if a BED file is not specified, and a window size is not specified + """ + + stats: List[PixyStat] + vcf_path: Path + populations_df: pandas.DataFrame + output_dir: Path + temp_file: Path + chromosomes: List[str] + num_cores: int = 1 + bypass_invariant_check: bool = False + fst_type: FSTEstimator = FSTEstimator.WC + output_prefix: str = "pixy" + chunk_size: int = 100000 + bed_df: Union[pandas.DataFrame, None] = None + window_size: Union[int, None] = None + interval_start: Union[int, None] = None + interval_end: Union[int, None] = None + sites_df: Union[pandas.DataFrame, None] = None + + def __post_init__(self) -> None: + """Checks a subset of mutually exclusive `pixy` args to ensure compliance.""" + if self.interval_start is None != self.interval_end is None: + raise ValueError( + "interval_start and interval_end must be specified together or not at all" + ) + + if self.bed_df is None == self.window_size is None: + raise ValueError("One but not both of a BED file or a window size must be specified") + + if self.bed_df is not None and self.interval_start is not None: + raise ValueError("An interval cannot be specified with a BED file") + + if self.interval_start is not None and self.interval_end is not None: + if self.interval_start > self.interval_end: + raise ValueError( + f"The specified interval start {self.interval_start} exceeds the specified " + f"interval end {self.interval_end}" + ) + + @property + def has_interval(self) -> bool: + """True if the `pixy` args included an interval.""" + # The post-init check enforces that the start exists if the end exists + return self.interval_start is not None + + @cached_property + def pop_names(self) -> NDArray[np.string_]: + """ + Returns the list of unique population names from the provided `populations_df`. + + NB, we cast to a datatype of np.str_ for `mypy`, which cannot infer the specific datatypes + within the array. + """ + return np.array(self.populations_df["Population"].unique(), dtype=np.str_) + + @cached_property + def pop_ids(self) -> NDArray[np.string_]: + """ + Returns the list of unique population identifiers from the provided `populations_df`. + + NB, we cast to a datatype of np.str_ for `mypy`, which cannot infer the specific datatypes + within the array. + """ + return np.array(self.populations_df["ID"].unique(), dtype=np.str_) + + +def validate_populations_path(populations_path: Path) -> pandas.DataFrame: + """ + Validates user-specified path to a populations file and its contents. + + A valid populations file has at least 2 columns in it, the first of which must be a sample + identifier. The second column must be the corresponding population to which that sample id + belongs. The sample identifier is assumed to be alphanumeric. + + Raises: + Exception, if the specific populations_path does not exist + Exception, if any of the rows in the populations_path is missing data + + Returns: + A pandas DataFrame containing the populations file contents, with one row for each row + in the input populations file. + This DataFrame includes two columns ("ID": str, "Population": str). + """ + # read in the list of samples/populations + if not os.path.exists(populations_path): + raise FileNotFoundError(f"The specified populations file {populations_path} does not exist") + + poppanel: pandas.DataFrame = pandas.read_csv( + populations_path, sep="\t", usecols=[0, 1], names=["ID", "Population"] + ) + poppanel["ID"] = poppanel["ID"].astype(str) + + # check for missing values + + if poppanel.isnull().values.any(): + raise ValueError( + "The specified populations file contains missing data, " + "confirm all samples have population IDs are are assigned to a population." + ) + + return poppanel + + +def validate_bed_path(bed_path: Path) -> pandas.DataFrame: + """ + Validate and load the user-specified path to a BED file. + + The validation checks that the BED file exists and contains three columns. Chromosome names are + coerced to string before returning. + + Args: + bed_path: A path to a BED file. + This file must be in BED3 format (three columns; chrom, start, and end). + + Returns: + A pandas DataFrame containing the BED file contents, with one row for each row in the BED + file. + + This DataFrame includes three columns ("chrom": str, "pos1": int, "pos2": int). + + Raises: + Exception: If the provided filepath does not exist. + Exception: If the BED file contains any rows with missing fields. + Every row must include three fields - chrom, start, and stop. + Exception: If the BED file is not in BED3 format; i.e. if the BED does not contain at + least three columns. + """ + if not os.path.exists(bed_path): + raise FileNotFoundError(f"The specified BED file {bed_path} does not exist") + + # read in the bed file and extract the chromosome column + bed_df = pandas.read_csv(bed_path, sep="\t", usecols=[0, 1, 2], names=["chrom", "pos1", "pos2"]) + + # force chromosomes to strings + bed_df["chrom"] = bed_df["chrom"].astype(str) + + if bed_df.isnull().values.any(): + raise ValueError( + "The specified BED file contains missing data, confirm all rows have all " + "three fields (chrom, pos1, pos2)." + ) + + if len(bed_df.columns) != 3: + raise ValueError(f"The specified BED file has {len(bed_df.columns)} columns; expected 3.") + + else: + bed_df.columns = ["chrom", "chromStart", "chromEnd"] + + return bed_df + + +def validate_sites_path(sites_path: Path) -> pandas.DataFrame: + """ + Validates user-specified path to a sites file, if provided to `pixy`. + + A valid sites file has no header and at least 2 columns in it. The first column must be a + chromosome and the second column must be a position. Position is assumed to be an int. + Chromosome names are coerced to a string before returning the dataframe. + + Raises: + Exception, if the specific sites path does not exist + Exception, if any of the rows in the sites_path is missing data + + Returns: + A pandas DataFrame containing the sites file contents, with one row for each row + in the input sites file. The dataframe has two columns ("CHROM": str, "POS": str). + """ + if not os.path.exists(sites_path): + raise FileNotFoundError(f"The specified sites file {sites_path} does not exist") + + sites_df = pandas.read_csv(sites_path, sep="\t", usecols=[0, 1], names=["chrom", "pos"]) + sites_df["chrom"] = sites_df["chrom"].astype(str) + + if sites_df.isnull().values.any(): + raise ValueError( + "The specified sites file contains missing data, confirm all rows each " + "have two fields (chrom, pos)." + ) + + if len(sites_df.columns) != 2: + raise ValueError(f"The specified BED file has {len(sites_df.columns)} columns; expected 2.") + + else: + sites_df.columns = ["CHROM", "POS"] + + return sites_df + + +def validate_vcf_path(vcf_path: str) -> None: + """ + Validates user-specified path to a VCF file. + + Additional validation of VCF file contents is currently performed outside of this function. + + Args: + vcf_path: the path to the VCF file of interest + + Raises: + Exception, if the specific VCF path does not exist + Exception, if the VCF has no .gz extension (e.g., is not compressed with `bgzip`) + Exception, if the VCF is not indexed (e.g., lacking either a `.csi` or `.tbi` extension) + + """ + if not os.path.exists(vcf_path): + raise FileNotFoundError(f"The specified VCF {vcf_path} does not exist.") + + if not vcf_path.endswith(".gz"): + raise ValueError( + "The vcf is not compressed with bgzip (or has no .gz extension). " + 'To fix this, run "bgzip [filename].vcf" first (and then index with ' + '"tabix [filename].vcf.gz" if necessary)' + ) + + if not (os.path.exists(vcf_path + ".tbi") or os.path.exists(vcf_path + ".csi")): + raise ValueError( + "The vcf is not indexed. Please either use `tabix` or `bcftools` to" + "produce a `.tbi` or `.csi` index." + ) + + +def validate_output_path(output_folder: str, output_prefix: str) -> Tuple[str, str]: + """ + Validates user-specified output paths for `pixy` output. + + Args: + output_folder: the directory to which to write any `pixy` results + output_prefix: the combination of a given `output_folder` and `output_prefix` + + Raises: + Exception, if the output folder is not writeable + Exception, if the output prefix contains slashes + + Returns: + output_folder and output_prefix + + """ + # attempt to create the output folder + Path(output_folder).mkdir(parents=True, exist_ok=True) + + # check if output folder is writable + # if not os.access(re.sub(r"[^\/]+$", "", args.outfile_prefix), os.W_OK): + if not os.access(output_folder, os.W_OK): + raise OSError(f"The output folder {output_folder} is not writable") + + # check if output_prefix is correctly specified + if "/" in str(output_prefix) or "\\" in str(output_prefix): + raise ValueError( + f"The output prefix {output_prefix} contains slashes. " + f"Remove them and specify output folder structure " + "with --output_folder if necessary." + ) + if output_folder != "": + output_folder = output_folder + "/" + else: + output_folder = os.path.expanduser(os.getcwd() + "/") + output_prefix = output_folder + output_prefix + + return output_folder, output_prefix + + +def get_chrom_list(args: argparse.Namespace) -> List[str]: + """ + Get the list of chromosomes for analysis. + + If `--chromosomes all` is specified, this will be all chromosomes in the provided VCF. + Otherwise, it will be the list of chromosomes provided to `--chromosomes`. + + Raises: + Exception: If any chromosomes specified are not found in the VCF. + """ + # get the list of all chromosomes in the dataset + chrom_all = subprocess.check_output("tabix -l " + args.vcf, shell=True).decode("utf-8").split() + if args.chromosomes != "all": + # If a subset of chromosomes were specified, limit our analysis to those, and ensure that + # they are all present in the VCF + chrom_list = list(str(args.chromosomes).split(",")) + + missing = list(set(chrom_list) - set(chrom_all)) + if len(missing) > 0: + raise ValueError( + f"The following chromosomes were specified but do not occur in the VCF: {missing}" + ) + + else: + # Otherwise return everything in the VCF + chrom_list = chrom_all + + return chrom_list + + +def validate_window_and_interval_args(args: argparse.Namespace) -> str: + """ + Validate the window and interval arguments when a BED file is not provided. + + Args: + args: The parsed command-line arguments. + + Returns: + A "check message", which is "OK" if all conditions are met, or a "WARNING" if the specified + interval is smaller than the specified window size. + """ + assert args.bed_file is None, ( + "this function should only be invoked when a BED file is not specified" + ) + logger: logging.Logger = logging.getLogger(__name__) + check_message: str = "OK" + + if args.window_size is None: + raise ValueError("In the absence of a BED file, a --window_size must be specified.") + + if args.interval_start is None and args.interval_end is not None: + raise ValueError( + "When specifying an interval, both --interval_start and --interval_end are required." + ) + + if args.interval_start is not None and args.interval_end is None: + raise ValueError( + "When specifying an interval, both --interval_start and --interval_end are required." + ) + + chrom_list: List[str] = get_chrom_list(args) + if (args.interval_start is not None or args.interval_end is not None) and len(chrom_list) > 1: + raise ValueError( + "--interval_start and --interval_end are not valid when calculating over " + "multiple chromosomes. Remove both arguments or specify a single chromosome." + ) + + if (args.interval_start is not None and args.interval_end is not None) and ( + (int(args.interval_end) - int(args.interval_start)) <= int(args.window_size) + ): + check_message = "WARNING" + logger.warning( + f"The specified interval {args.interval_start}-{args.interval_stop} " + f"is smaller than the window size ({args.window_size}). " + "A single window will be returned." + ) + + return check_message + + +def check_and_validate_args( # noqa: C901 + args: argparse.Namespace, +) -> PixyArgs: + """ + Checks whether user-specific arguments are valid. + + Args: + args: parsed CLI args specified by the user + + Raises: + Exception: if the output_folder is not writeable + Exception: if any of the required input files are missing + Exception: if the output_prefix contains either forward or backward slashes + Exception: if any of the provided files do not exist + Exception: if the VCF file is not compressed with `bgzip` or indexed with `tabix` + Exception: if the VCF file does not contain variant sites + Exception: if the provided `--chromosomes` do not occur in the specified VCF file + Exception: if neither a `--bed-file` nor a `--window_size` is provided + Exception: if only one of `--interval_start` and `--interval_end` is given + Exception: if multiple `--chromosomes` and an interval are provided + Exception: if any rows in either the `--bed-file` or `--sites-path` are missing data + Exception: if any of the samples provided in the `populations_file` do not exist in the VCF + Exception: if the `populations_file` does not contain at least 2 populations + + Returns: + an instance of PixyArgs where each attribute is validated from user-specified input + + """ + # CHECK FOR TABIX + tabix_path = shutil.which("tabix") + logger = logging.getLogger(__name__) + if tabix_path is None: + raise ValueError( + "`tabix` is not installed (or cannot be located in the path). " + 'Install tabix with "conda install -c bioconda htslib".' + ) + + if args.vcf is None: + raise ValueError(f"The --vcf argument is missing or incorrectly specified: {args.vcf}") + + if args.populations is None: + raise ValueError( + f"The --populations argument is missing or incorrectly specified: {args.populations}." + ) + + # reformat file paths for compatibility + + populations_path: Path = Path(os.path.expanduser(args.populations)) + populations_df: pandas.DataFrame = validate_populations_path(populations_path) + + vcf_path: str = os.path.expanduser(args.vcf) # we don't want a Path object just yet because + # most of the downstream operations require a string + validate_vcf_path(vcf_path) + + if args.output_folder != "": + output_folder = args.output_folder + "/" + else: + output_folder = os.path.expanduser(os.getcwd() + "/") + + output_prefix = output_folder + args.output_prefix + + # get vcf header info + vcf_headers = allel.read_vcf_headers(vcf_path) + + logger.info("Validating VCF and input parameters...") + + # CHECK OUTPUT FOLDER + logger.info("Checking write access...") + + output_folder, output_prefix = validate_output_path( + output_folder=args.output_folder, output_prefix=args.output_prefix + ) + + # CHECK CPU CONFIGURATION + logger.info("Checking CPU configuration...") + check_message = "OK" + + if args.n_cores > mp.cpu_count(): + logger.warning( + f"{args.n_cores} CPU cores requested but only {mp.cpu_count()} available. " + f"Using {mp.cpu_count()} cores." + ) + args.n_cores = mp.cpu_count() + + # CHECK FOR EXISTENCE OF INPUT FILES + + if args.bed_file is not None: + bed_path: Path = Path(os.path.expanduser(args.bed_file)) + bed_df: pandas.DataFrame = validate_bed_path(bed_path) + + else: + bed_df = None + + # VALIDATE THE VCF + + # check if the vcf contains any invariant sites + # a very basic check: just looks for at least one invariant site in the alt field + logger.info("Checking for invariant sites...") + check_message = "OK" + + if args.bypass_invariant_check == "no": + alt_list = ( + subprocess.check_output( + "gunzip -c " + + vcf_path + + " | grep -v '#' | head -n 100000 | awk '{print $5}' | sort | uniq", + shell=True, + ) + .decode("utf-8") + .split() + ) + if "." not in alt_list: + raise ValueError( + "The provided VCF appears to contain no invariant sites " + '(ALT = "."). ' + "This check can be bypassed via --bypass_invariant_check 'yes'." + ) + if "." in alt_list and len(alt_list) == 1: + logger.warning( + "The provided VCF appears to contain no variable sites in the " + "first 100 000 sites. It may have been filtered incorrectly, or genetic diversity " + "may be extremely low. " + "This warning can be suppressed via --bypass_invariant_check 'yes'.'" + ) + else: + if not (len(args.stats) == 1 and (args.stats[0] == "fst")): + logger.warning( + "EXTREME WARNING: --bypass_invariant_check is set to 'yes'. Note that a " + "lack of invariant sites will result in incorrect estimates." + ) + + # check if requested chromosomes exist in vcf + # parses the whole CHROM column (!) + + logger.info("Checking chromosome data...") + + chrom_list: List[str] = get_chrom_list(args) + + # INTERVALS + # check if intervals are correctly specified + # validate the BED file (if present) + + logger.info("Checking intervals/sites...") + check_message = "OK" + + if args.bed_file is None: + check_message = validate_window_and_interval_args(args) + logger.info(check_message) + else: + if ( + args.interval_start is not None + or args.interval_end is not None + or args.window_size is not None + ): + raise ValueError( + "--interval_start, --interval_end, and --window_size are not valid " + "when a BED file of windows is provided." + ) + + bed_chrom: List[str] = list(bed_df["chrom"]) + missing = list(set(bed_chrom) - set(chrom_list)) + chrom_list = list(set(chrom_list) & set(bed_chrom)) + + if len(missing) > 0: + logger.warning( + "The following chromosomes are in the BED file but do not occur in the VCF " + f"and will be ignored: {missing}" + ) + if args.sites_file is None: + sites_df = None + chrom_sites: List[str] = [] + missing_sites: List[str] = [] + + else: + sites_path: Path = Path(os.path.expanduser(args.sites_file)) + sites_df = validate_sites_path(sites_path=sites_path) + + # all the chromosomes in the sites file + chrom_sites = list(sites_df["CHROM"]) + + # the difference between the chromosomes in the sites file and the VCF + missing_sites = list(set(chrom_sites) - set(chrom_list)) + + if len(missing_sites) > 0: + logger.warning( + "The following chromosomes occur in the sites file but do not occur in the " + f"VCF and will be ignored: {missing_sites}" + ) + + # SAMPLES + # check if requested samples exist in vcf + + logger.info("Checking sample data...") + + # - parse + validate the population file + # - format is IND POP (tab separated) + # - throws an error if individuals are missing from VCF + + # get a list of samples from the callset + samples_list = vcf_headers.samples + # make sure every indiv in the pop file is in the VCF callset + sample_ids: List[str] = list(populations_df["ID"]) + missing = list(set(sample_ids) - set(samples_list)) + + # find the samples in the callset index by matching up the order of samples between the + # population file and the callset + # also check if there are invalid samples in the popfile + try: + samples_callset_index = [samples_list.index(s) for s in populations_df["ID"]] + except ValueError as e: + raise ValueError( + f"The following samples are listed in the population file but not in the VCF: {missing}" + ) from e + else: + populations_df["callset_index"] = samples_callset_index + + # use the popindices dictionary to keep track of the indices for each population + popindices = {} + popnames = populations_df.Population.unique() + for name in popnames: + popindices[name] = populations_df[ + populations_df.Population == name + ].callset_index.values + + if (populations_df["Population"].nunique() == 1) and ( + "fst" in args.stats or "dxy" in args.stats + ): + raise ValueError( + "Calculation of fst and/or dxy requires at least two populations to be " + "defined in the population file." + ) + + logger.info("All initial checks passed!") + stats: List[PixyStat] = [PixyStat[stat.upper()] for stat in args.stats] + tmp_path: Path = _generate_tmp_path(output_dir=output_folder) + _check_tmp_path(tmp_path) + return PixyArgs( + stats=stats, + vcf_path=Path(vcf_path), + populations_df=populations_df, + num_cores=args.n_cores, + bypass_invariant_check=args.bypass_invariant_check, + bed_df=bed_df, + output_dir=Path(output_folder), + output_prefix=output_prefix, + chromosomes=chrom_list, + window_size=args.window_size, + interval_start=args.interval_start, + interval_end=args.interval_end, + sites_df=sites_df, + chunk_size=args.chunk_size, + fst_type=FSTEstimator[args.fst_type.upper()], + temp_file=tmp_path, + ) + + +def _generate_tmp_path(output_dir: str) -> Path: + """Generates a temporary file path to which `pixy` will write intermediate results.""" + return Path(output_dir) / f"pixy_tmpfile_{uuid.uuid4().hex}.tmp" + + +def _check_tmp_path(temp_file: Path) -> None: + # check if temp file is writable + with open(temp_file, "w"): + pass # file is created and then closed + assert os.access(temp_file, os.W_OK), "temp file is not writable" diff --git a/pixy/calc.py b/pixy/calc.py index 64a045c..604ed1b 100644 --- a/pixy/calc.py +++ b/pixy/calc.py @@ -1,13 +1,20 @@ import typing -from typing import Tuple, Union, List, Any +from typing import Tuple, Union, List from allel import AlleleCountsArray, GenotypeArray import allel import numpy as np +from numpy.typing import NDArray from scipy import special +from pixy.models import NA +from pixy.models import DxyResult +from pixy.models import FstResult +from pixy.models import PiResult +from pixy.enums import FSTEstimator + # vectorized functions for calculating pi and dxy # these are reimplementations of the original functions @@ -38,7 +45,7 @@ def count_diff_comp_missing(row: AlleleCountsArray, n_haps: int) -> Tuple[int, i # function for vectorized calculation of pi from a pre-filtered scikit-allel genotype matrix -def calc_pi(gt_array: GenotypeArray) -> Tuple[Union[float, str], int, int, int]: +def calc_pi(gt_array: GenotypeArray) -> PiResult: """Given a filtered genotype matrix, calculate `pi`. Args: @@ -78,20 +85,20 @@ def calc_pi(gt_array: GenotypeArray) -> Tuple[Union[float, str], int, int, int]: # if there are valid data (comparisons between genotypes) at the site, compute average dxy # otherwise return NA - avg_pi: Union[float, str] - if total_comps > 0: - avg_pi = total_diffs / total_comps - else: - avg_pi = "NA" + avg_pi: Union[float, NA] = total_diffs / total_comps if total_comps > 0 else "NA" - return (avg_pi, total_diffs, total_comps, total_missing) + return PiResult( + avg_pi=avg_pi, + total_diffs=total_diffs, + total_comps=total_comps, + total_missing=total_missing, + ) # function for vectorized calculation of dxy from a pre-filtered scikit-allel genotype matrix -def calc_dxy( - pop1_gt_array: GenotypeArray, pop2_gt_array: GenotypeArray -) -> Tuple[Union[float, str], int, int, int]: - """Given a filtered genotype matrix, calculate `dxy`. +def calc_dxy(pop1_gt_array: GenotypeArray, pop2_gt_array: GenotypeArray) -> DxyResult: + """ + Given a filtered genotype matrix, calculate `dxy`. Args: pop1_gt_array: the GenotypeArray representing population-specific allele counts @@ -103,7 +110,6 @@ def calc_dxy( total_diffs: sum of the number of differences between the populations total_comps: sum of the number of comparisons between the populations total_missing: sum of the number of missing between the populations - """ # the counts of each of the two alleles in each population at each site @@ -134,13 +140,14 @@ def calc_dxy( # if there are valid data (comparisons between genotypes) at the site, compute average dxy # otherwise return NA - avg_dxy: Union[float, str] - if total_comps > 0: - avg_dxy = total_diffs / total_comps - else: - avg_dxy = "NA" + avg_dxy: Union[float, NA] = total_diffs / total_comps if total_comps > 0 else "NA" - return (avg_dxy, total_diffs, total_comps, total_missing) + return DxyResult( + avg_dxy=avg_dxy, + total_diffs=total_diffs, + total_comps=total_comps, + total_missing=total_missing, + ) # function for obtaining fst AND variance components via scikit allel function @@ -149,8 +156,8 @@ def calc_dxy( # in aggregation mode, we just want a,b,c and n_sites for aggregating and fst @typing.no_type_check def calc_fst( - gt_array_fst: GenotypeArray, fst_pop_indicies: List[List[int]], fst_type: str -) -> Tuple[Union[float, str], float, float, Union[float, int], int]: + gt_array_fst: GenotypeArray, fst_pop_indicies: List[List[int]], fst_type: FSTEstimator +) -> FstResult: # TODO: update the return type here after refactoring (2 -> 1 return statements) """Calculates FST according to either Weir and Cockerham (1984) or Hudson (1992). @@ -185,6 +192,9 @@ def calc_fst( a: List[float] b: List[float] c: Union[List[float], int] + + result: FstResult + # WC 84 if fst_type == "wc": a, b, c = allel.weir_cockerham_fst(gt_array_fst, subpops=fst_pop_indicies) @@ -201,8 +211,7 @@ def calc_fst( else: fst = "NA" - return (fst, a, b, c, n_sites) - # TODO: can't get mypy to recognize multiple return statements of slightly different structures? + result = FstResult(fst=fst, a=a, b=b, c=c, n_sites=n_sites) # Hudson 92 if fst_type == "hudson": @@ -228,8 +237,9 @@ def calc_fst( # same abc format as WC84, where 'a' is the numerator and # 'b' is the demoninator, and 'c' is a zero placeholder - return (fst, num, den, c, n_sites) - # TODO: mypy thinks this function is missing a return statement + result = FstResult(fst=fst, a=num, b=den, c=c, n_sites=n_sites) + + return result # simplified version of above to handle the case @@ -237,8 +247,10 @@ def calc_fst( def calc_fst_persite( - gt_array_fst: GenotypeArray, fst_pop_indicies: List[List[int]], fst_type: str -) -> Any: + gt_array_fst: GenotypeArray, + fst_pop_indicies: List[List[int]], + fst_type: str, +) -> NDArray[np.float64]: """Calculates site-specific FST according to Weir and Cockerham (1984) or Hudson (1992). Args: @@ -252,6 +264,7 @@ def calc_fst_persite( """ # compute basic (multisite) FST via scikit allel + fst: NDArray[np.float64] # WC 84 if fst_type == "wc": @@ -259,9 +272,6 @@ def calc_fst_persite( fst = np.sum(a, axis=1) / (np.sum(a, axis=1) + np.sum(b, axis=1) + np.sum(c, axis=1)) - return fst - # TODO: fix nested returns - # Hudson 92 elif fst_type == "hudson": # following scikit allel docs @@ -274,4 +284,7 @@ def calc_fst_persite( fst = num / den - return fst + else: + raise ValueError("fst_type must be either 'wc' or 'hudson'") + + return fst diff --git a/pixy/core.py b/pixy/core.py index 2445a02..cd0ecd6 100644 --- a/pixy/core.py +++ b/pixy/core.py @@ -1,36 +1,43 @@ +import argparse +import os +import shutil +import subprocess import typing -from typing import Tuple, Optional, List, Dict, Union +import uuid +from itertools import combinations +from pathlib import Path +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union import allel -import numcodecs import numpy as np -import sys -import os -import subprocess -import re import pandas -import operator -import warnings -import time -import pathlib -import argparse -import shutil -import uuid - -from allel import GenotypeArray, SortedIndex - -import pixy.calc - -from multiprocess import Pool +from allel import GenotypeArray +from allel import GenotypeVector +from allel import SortedIndex from multiprocess import Queue -import multiprocess as mp -from scipy import special -from itertools import combinations -from collections import Counter +import pixy.calc +from pixy.args_validation import get_chrom_list +from pixy.args_validation import validate_bed_path +from pixy.args_validation import validate_sites_path +from pixy.args_validation import validate_populations_path +from pixy.args_validation import validate_vcf_path +from pixy.args_validation import validate_output_path +from pixy.args_validation import validate_window_and_interval_args +from pixy.enums import FSTEstimator +from pixy.models import PixyTempResult +from pixy.args_validation import validate_bed_path +from pixy.stats.summary import precompute_filtered_variant_array +from pixy.stats.summary import compute_summary_pi +from pixy.stats.summary import compute_summary_dxy +from pixy.stats.summary import compute_summary_fst +from pixy.enums import FSTEstimator -# function for re-aggregating subwindows from the temporary output def aggregate_output( df_for_stat: pandas.DataFrame, stat: str, @@ -38,7 +45,8 @@ def aggregate_output( window_size: int, fst_type: str, ) -> pandas.DataFrame: - """Aggregates genomic data into windows and computes summary statistics over each window. + """ + Aggregates genomic data into windows and computes summary statistics over each window. Summary statistics could be one or more of pi (genetic diversity within a population), dxy (genetic diversity between populations), or fst (proportion of the total genetic variance across a subpopulation). @@ -53,8 +61,8 @@ def aggregate_output( chromosome: The name of the chromosome for the current window. window_size: The size of the genomic windows (in base pairs) for aggregation. fst_type: The method for calculating fst. One of: - - 'wc' for Wright's fst, - - 'hudson' for Hudson's fst. + - 'wc' for Weir and Cockerham's fst + - 'hudson' for Hudson's fst Raises: -- @@ -63,7 +71,6 @@ def aggregate_output( outsorted: a pandas.DataFrame that stores aggregated statistics for each window """ - outsorted = df_for_stat.sort_values([4]) # sort by position interval_start = df_for_stat[4].min() interval_end = df_for_stat[4].max() @@ -165,7 +172,8 @@ def assign_subwindows_to_windows( # goal here to reduce I/O bottlenecks by only doing a single read of # the VCF for a group of small windows def assign_windows_to_chunks(window_pre_list: List[List[int]], chunk_size: int) -> List[List[int]]: - """Assigns windows to larger chunks. + """ + Assigns windows to larger chunks. Windows that overlap multiple chunks (based on start and stop positions) will be corraled into the first chunk. @@ -210,7 +218,8 @@ def assign_windows_to_chunks(window_pre_list: List[List[int]], chunk_size: int) # function for assinging sites to larger chunks def assign_sites_to_chunks(sites_pre_list: List[int], chunk_size: int) -> List[List[int]]: - """Assigns each site in a list to a chunk based on its position and given chunk size. + """ + Assigns each site in a list to a chunk based on its position and given chunk size. The chunk index for each site is determined by dividing the site position by `(chunk_size + 1)` and using `np.floor` to calculate the chunk index. This @@ -245,7 +254,8 @@ def assign_sites_to_chunks(sites_pre_list: List[int], chunk_size: int) -> List[L def mask_non_target_sites( gt_array: GenotypeArray, pos_array: SortedIndex, sites_list_chunk: List[int] ) -> GenotypeArray: - """Masks non-target sites in a genotype array. + """ + Masks non-target sites in a genotype array. Masked sites are set to missing data (`-1`). @@ -398,7 +408,8 @@ def compute_summary_stats( aggregate: bool, window_size: int, ) -> None: - """Calculates summary statistics and writes them out to a temp file. + """ + Calculates summary statistics and writes them out to a temp file. Args: args: user-specified command-line arguments @@ -421,6 +432,8 @@ def compute_summary_stats( None """ + pixy_output: List[PixyTempResult] = [] + # read in the genotype data for the chunk callset_is_none, gt_array, pos_array = read_and_filter_genotypes( args, chromosome, chunk_pos_1, chunk_pos_2, sites_list_chunk @@ -428,87 +441,33 @@ def compute_summary_stats( # if computing FST, pre-compute a filtered array of variants (only) if "fst" in args.stats: - if (not callset_is_none) and (args.populations is not None) and (len(gt_array) != 0): - # compute allel freqs - allele_counts = gt_array.count_alleles() - allele_freqs = allele_counts.to_frequencies() - - # remove invariant/polyallelic sites - variants_array = [len(x) == 2 and x[0] < 1 for x in allele_freqs] - - # filter gt and position arrays for biallelic variable sites - gt_array_fst = gt_array[variants_array] - pos_array_fst = pos_array[variants_array] + per_site_fst_results, gt_array_fst, pos_array_fst = precompute_filtered_variant_array( + args=args, + gt_array=gt_array, + pos_array=pos_array, + callset_is_none=callset_is_none, + window_size=window_size, + popindices=popindices, + chromosome=chromosome, + ) - else: - pos_array_fst = None - gt_array_fst = None - - # if obtaining per-site estimates, - # compute the FST values for the whole chunk - # instead of looping over subwindows (below) - - if window_size == 1: - # determine all the possible population pairings - pop_names = list(popindices.keys()) - fst_pop_list = list(combinations(pop_names, 2)) - pixy_output: str = "" - # for each pair, compute fst using the filtered gt_array - for pop_pair in fst_pop_list: - # the indices for the individuals in each population - fst_pop_indicies = [ - popindices[pop_pair[0]].tolist(), - popindices[pop_pair[1]].tolist(), - ] - - # compute FST - # windowed_weir_cockerham_fst seems to generate (spurious?) warnings about div/0, so suppressing warnings - # (this assumes that the scikit-allel function is working as intended) - np.seterr(divide="ignore", invalid="ignore") - - # if the genotype matrix is not empty, compute FST - # other wise return NA - - if not callset_is_none and gt_array_fst is not None and len(gt_array_fst) > 0: - fst = pixy.calc.calc_fst_persite(gt_array_fst, fst_pop_indicies, args.fst_type) - window_positions = list(zip(pos_array_fst, pos_array_fst)) - n_snps = [1] * len(pos_array_fst) - - for fst, wind, snps in zip(fst, window_positions, n_snps): - # append trailing NAs so that pi/dxy/fst have the same # of columns - pixy_result = ( - "fst" - + "\t" - + str(pop_pair[0]) - + "\t" - + str(pop_pair[1]) - + "\t" - + str(chromosome) - + "\t" - + str(wind[0]) - + "\t" - + str(wind[1]) - + "\t" - + str(fst) - + "\t" - + str(snps) - + "\tNA\tNA\tNA" - ) - - if "pixy_output" in locals(): - pixy_output = pixy_output + "\n" + pixy_result - else: - pixy_output = pixy_result + if per_site_fst_results is not None: + pixy_output.extend(per_site_fst_results) # loop over the windows within the chunk and compute summary stats for window_index in range(0, len(window_list_chunk)): window_pos_1 = window_list_chunk[window_index][0] window_pos_2 = window_list_chunk[window_index][1] + window_is_empty: bool + gt_region: Union[GenotypeVector, None] + if pos_array is None: window_is_empty = True + gt_region = None elif len(pos_array[(pos_array >= window_pos_1) & (pos_array <= window_pos_2)]) == 0: window_is_empty = True + gt_region = None else: window_is_empty = False # pull out the genotypes for the window @@ -533,779 +492,66 @@ def compute_summary_stats( # AVERAGE NUCLEOTIDE DIFFERENCES WITHIN POPULATIONS if (args.populations is not None) and ("pi" in args.stats): - for pop in popnames: - # if the window has no sites in the VCF, assign all NAs, - # otherwise calculate pi - if window_is_empty: - avg_pi, total_diffs, total_comps, total_missing, no_sites = ( - "NA", - "NA", - "NA", - "NA", - 0, - ) - else: - # subset the window for the individuals in each population - gt_pop = gt_region.take(popindices[pop], axis=1) - - # if the population specific window for this region is empty, report it as such - if len(gt_pop) == 0: - avg_pi, total_diffs, total_comps, total_missing, no_sites = ( - "NA", - "NA", - "NA", - "NA", - 0, - ) - - # otherise compute pi as normal - else: - # number of sites genotyped in the population - # not directly used in the calculation - no_sites = np.count_nonzero(np.sum(gt_pop.count_alleles(max_allele=1), 1)) - avg_pi, total_diffs, total_comps, total_missing = pixy.calc.calc_pi(gt_pop) - - # create a string of the pi results to write to file - # klk added NA so that pi/dxy/fst have the same # of columns - pixy_result = ( - "pi" - + "\t" - + str(pop) - + "\tNA\t" - + str(chromosome) - + "\t" - + str(window_pos_1) - + "\t" - + str(window_pos_2) - + "\t" - + str(avg_pi) - + "\t" - + str(no_sites) - + "\t" - + str(total_diffs) - + "\t" - + str(total_comps) - + "\t" - + str(total_missing) - ) - - # append the result to the multiline output string - if "pixy_output" in locals(): - pixy_output = pixy_output + "\n" + pixy_result - else: - pixy_output = pixy_result + pi_results: List[PixyTempResult] = compute_summary_pi( + popnames=popnames, + window_is_empty=window_is_empty, + gt_region=gt_region, + popindices=popindices, + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + ) + + pixy_output.extend(pi_results) # DXY: # AVERAGE NUCLEOTIDE DIFFERENCES BETWEEN POPULATIONS if (args.populations is not None) and ("dxy" in args.stats): # create a list of all pairwise comparisons between populations in the popfile - dxy_pop_list = list(combinations(popnames, 2)) - - # interate over all population pairs and compute dxy - for pop_pair in dxy_pop_list: - pop1 = pop_pair[0] - pop2 = pop_pair[1] - - if window_is_empty: - avg_dxy, total_diffs, total_comps, total_missing, no_sites = ( - "NA", - "NA", - "NA", - "NA", - 0, - ) - - else: - # use the popGTs dictionary to keep track of this region's GTs for each population - popGTs = {} - for name in pop_pair: - gt_pop = gt_region.take(popindices[name], axis=1) - popGTs[name] = gt_pop - - pop1_gt_region = popGTs[pop1] - pop2_gt_region = popGTs[pop2] - - # if either of the two population specific windows for this region are empty, report it missing - if len(pop1_gt_region) == 0 or len(pop2_gt_region) == 0: - avg_dxy, total_diffs, total_comps, total_missing, no_sites = ( - "NA", - "NA", - "NA", - "NA", - 0, - ) - - # otherwise compute dxy as normal - else: - # for number of sites (not used in calculation), report the - # number of sites that have at least one genotype in BOTH populations - pop1_sites = np.sum(pop1_gt_region.count_alleles(max_allele=1), 1) > 0 - pop2_sites = np.sum(pop2_gt_region.count_alleles(max_allele=1), 1) > 0 - no_sites = np.sum(np.logical_and(pop1_sites, pop2_sites)) - avg_dxy, total_diffs, total_comps, total_missing = pixy.calc.calc_dxy( - pop1_gt_region, pop2_gt_region - ) - - # create a string of for the dxy results - pixy_result = ( - "dxy" - + "\t" - + str(pop1) - + "\t" - + str(pop2) - + "\t" - + str(chromosome) - + "\t" - + str(window_pos_1) - + "\t" - + str(window_pos_2) - + "\t" - + str(avg_dxy) - + "\t" - + str(no_sites) - + "\t" - + str(total_diffs) - + "\t" - + str(total_comps) - + "\t" - + str(total_missing) - ) - - # append the result to the multiline output string - if "pixy_output" in locals(): - pixy_output = pixy_output + "\n" + pixy_result - else: - pixy_output = pixy_result + dxy_results: List[PixyTempResult] = compute_summary_dxy( + popnames=popnames, + window_is_empty=window_is_empty, + gt_region=gt_region, + popindices=popindices, + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + ) + + pixy_output.extend(dxy_results) # FST: # WEIR AND COCKERHAMS FST # This is just a loose wrapper around the scikit-allel fst function # TBD: explicit fst when data is completely missing - a: Union[float, int, str] if (args.populations is not None) and ("fst" in args.stats) and window_size != 1: - # check for valid sites in the FST (variant only) data position array - if pos_array_fst is not None: - if np.logical_and( - pos_array_fst >= window_pos_1, pos_array_fst <= window_pos_2 - ).any(): - # if there are valid sites, determine all the possible population pairings - pop_names = list(popindices.keys()) - fst_pop_list = list(combinations(pop_names, 2)) - - # for each pair, compute fst using the filtered gt_array - for pop_pair in fst_pop_list: - # the indices for the individuals in each population - fst_pop_indicies = [ - popindices[pop_pair[0]].tolist(), - popindices[pop_pair[1]].tolist(), - ] - - # compute FST - # windowed_weir_cockerham_fst seems to generate (spurious?) warnings about div/0, so suppressing warnings - # (this assumes that the scikit-allel function is working as intended) - np.seterr(divide="ignore", invalid="ignore") - - # if the genotype matrix is not empty, compute FST - # other wise return NA - - if ( - not callset_is_none - and gt_array_fst is not None - and len(gt_array_fst) > 0 - and not window_is_empty - ): - # compute an ad-hoc window size - fst_window_size = window_pos_2 - window_pos_1 - - # otherwise, compute FST using the scikit-allel window fst functions or the pixy "direct" method if aggregating - if not aggregate: - if args.fst_type == "wc": - fst, window_positions, n_snps = ( - allel.windowed_weir_cockerham_fst( - pos_array_fst, - gt_array_fst, - subpops=fst_pop_indicies, - size=fst_window_size, - start=window_pos_1, - stop=window_pos_2, - ) - ) - - if args.fst_type == "hudson": - ac1 = gt_array_fst.count_alleles(subpop=fst_pop_indicies[0]) - ac2 = gt_array_fst.count_alleles(subpop=fst_pop_indicies[1]) - fst, window_positions, n_snps = allel.windowed_hudson_fst( - pos_array_fst, - ac1, - ac2, - size=fst_window_size, - start=window_pos_1, - stop=window_pos_2, - ) - else: - fst, a, b, c, n_snps = pixy.calc.calc_fst( - gt_array_fst, fst_pop_indicies, args.fst_type - ) - window_positions = [[window_pos_1, window_pos_2]] - - else: - # if there are no variable sites in the window, output NA/0 - # edit: i think it actually makes more sense to just omit these sites - if not aggregate: - fst, window_positions, n_snps = ( - ["NA"], - [[window_pos_1, window_pos_2]], - [0], - ) - else: - fst, window_positions, n_snps, a, b, c = ( - "NA", - [[window_pos_1, window_pos_2]], - 0, - "NA", - "NA", - "NA", - ) - - # create an output string for the FST results - - # print(fst) - # print(window_positions) - # print(n_snps) - if aggregate: - pixy_result = ( - "fst" - + "\t" - + str(pop_pair[0]) - + "\t" - + str(pop_pair[1]) - + "\t" - + str(chromosome) - + "\t" - + str(window_pos_1) - + "\t" - + str(window_pos_2) - + "\t" - + str(fst) - + "\t" - + str(n_snps) - + "\t" - + str(a) - + "\t" - + str(b) - + "\t" - + str(c) - ) - - else: - for fst, wind, snps in zip(fst, window_positions, n_snps): - # append trailing NAs so that pi/dxy/fst have the same # of columns - pixy_result = ( - "fst" - + "\t" - + str(pop_pair[0]) - + "\t" - + str(pop_pair[1]) - + "\t" - + str(chromosome) - + "\t" - + str(wind[0]) - + "\t" - + str(wind[1]) - + "\t" - + str(fst) - + "\t" - + str(snps) - + "\tNA\tNA\tNA" - ) - - # append the result to the multiline output string - - if "pixy_output" in locals(): - pixy_output = pixy_output + "\n" + pixy_result - - else: - pixy_output = pixy_result - - # else: - # # if there are no variable sites in the window, output NA/0 - # # edit: i think it actually makes more sense to just omit these sites - # if not aggregate: - # fst, window_positions, n_snps = ["NA"],[[window_pos_1,window_pos_2]],[0] - # pixy_result = "fst" + "\t" + str(pop_pair[0]) + "\t" + str(pop_pair[1]) + "\t" + str(chromosome) + "\t" + str(wind[0]) + "\t" + str(wind[1]) + "\t" + str(fst) + "\t" + str(snps)+ "\tNA\tNA\tNA" - # else: - # fst, window_positions, n_snps, a, b, c = "NA",[[window_pos_1,window_pos_2]], 0, "NA", "NA", "NA" - # pixy_result = "fst" + "\t" + str(pop_pair[0]) + "\t" + str(pop_pair[1]) + "\t" + str(chromosome) + "\t" + str(window_pos_1) + "\t" + str(window_pos_2) + "\t" + str(fst) + "\t" + str(n_snps)+ "\t" + str(a) + "\t" + str(b) +"\t" + str(c) - # - # if 'pixy_output' in locals(): - # pixy_output = pixy_output + "\n" + pixy_result - # - # else: - # pixy_output = pixy_result + fst_type: FSTEstimator = FSTEstimator(args.fst_type) + fst_results: List[PixyTempResult] = compute_summary_fst( + fst_type=fst_type, + gt_array_fst=gt_array_fst, + pos_array_fst=pos_array_fst, + window_is_empty=window_is_empty, + callset_is_none=callset_is_none, + aggregate=aggregate, + popindices=popindices, + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + ) + + pixy_output.extend(fst_results) # OUTPUT # if in mc mode, put the results in the writing queue # otherwise just write to file # ensure the output variable exists in some form - - if "pixy_output" in locals(): + if pixy_output: + temp_pixy_content: str = "\n".join(f"{result}" for result in pixy_output) if args.n_cores > 1: - q.put(pixy_output) - + q.put(temp_pixy_content) elif args.n_cores == 1: outfile = open(temp_file, "a") - outfile.write(pixy_output + "\n") + outfile.write(temp_pixy_content + "\n") outfile.close() - - -# function for checking & validating command line arguments -# when problems are detected, throws an error/warning + message - - -@typing.no_type_check -def check_and_validate_args( - args: argparse.Namespace, -) -> Tuple[ - np.ndarray, - Dict[str, np.ndarray], - List[str], - List[str], - str, - str, - str, - pandas.DataFrame, - pandas.DataFrame, -]: - """Checks whether user-specific arguments are valid. - - Args: - args: parsed CLI args specified by the user - - Raises: - Exception: if the output_folder is not writeable - Exception: if any of the required input files are missing - Exception: if the output_prefix contains either forward or backward slashes - Exception: if any of the provided files do not exist - Exception: if the VCF file is not compressed with `bgzip` or indexed with `tabix` - Exception: if the VCF file does not contain variant sites - Exception: if the provided `--chromosomes` do not occur in the specified VCF file - Exception: if neither a `--bed-file` nor a `--window_size` is provided - Exception: if only one of `--interval_start` and `--interval_end` is given - Exception: if multiple `--chromosomes` and an interval are provided - Exception: if any rows in either the `--bed-file` or `--sites-path` are missing data - Exception: if any of the samples provided in the `populations_file` do not exist in the VCF - Exception: if the `populations_file` does not contain at least 2 populations - - Returns: - popnames: unique population names derived from `--populations` file - popindices: the indices of the population-specific samples within the provided VCF - chrom_list: list of chromosomes that are provided and also found in the provided VCF - IDs: list of each individual in the `--populations` file - temp_file: a temporary file in which to hold in-flight `pixy` calculations - output_folder: the directory to which to write any `pixy` results - output_prefix: the combination of a given `output_folder` and `output_prefix` - bed_df: a pandas.DataFrame that represents a given `--bed-file` (empty list if none given) - sites_df: a pandas.DataFrame that represents a given `--sites-file` (empty list if none given) - - """ - # CHECK FOR TABIX - tabix_path = shutil.which("tabix") - - if tabix_path is None: - raise Exception( - '[pixy] ERROR: tabix is not installed (or cannot be located in the path). Install tabix with "conda install -c bioconda htslib".' - ) - - if args.vcf is None: - raise Exception("[pixy] ERROR: The --vcf argument is missing or incorrectly specified.") - - if args.populations is None: - raise Exception( - "[pixy] ERROR: The --populations argument is missing or incorrectly specified." - ) - - # reformat file paths for compatibility - args.vcf = os.path.expanduser(args.vcf) - args.populations = os.path.expanduser(args.populations) - - if args.output_folder != "": - output_folder = args.output_folder + "/" - else: - output_folder = os.path.expanduser(os.getcwd() + "/") - - output_prefix = output_folder + args.output_prefix - - # get vcf header info - vcf_headers = allel.read_vcf_headers(args.vcf) - - print("\n[pixy] Validating VCF and input parameters...") - - # CHECK OUTPUT FOLDER - print("[pixy] Checking write access...", end="") - check_message = "OK" - - # attempt to create the output folder - if os.path.exists(output_folder) is not True: - os.makedirs(output_folder) - - # check if output folder is writable - # if not os.access(re.sub(r"[^\/]+$", "", args.outfile_prefix), os.W_OK): - if not os.access(output_folder, os.W_OK): - raise Exception("[pixy] ERROR: The output folder " + output_folder + " is not writable") - - # check if output_prefix is correctly specified - if "/" in str(args.output_prefix) or "\\" in str(args.output_prefix): - raise Exception( - "[pixy] ERROR: The output prefix '" - + str(args.output_prefix) - + "' contains slashes. Remove them and specify output folder structure with --output_folder if necessary." - ) - - # generate a name for a unique temp file for collecting output - temp_file = output_folder + "pixy_tmpfile_" + str(uuid.uuid4().hex) + ".tmp" - - # check if temp file is writable - with open(temp_file, "w"): - pass - - if check_message == "OK": - print(check_message) - - # CHECK CPU CONFIGURATION - print("[pixy] Checking CPU configuration...", end="") - check_message = "OK" - - if args.n_cores > mp.cpu_count(): - check_message = "WARNING" - print(check_message) - print( - "[pixy] WARNING: " - + str(args.n_cores) - + " CPU cores requested but only " - + str(mp.cpu_count()) - + " are available. Using " - + str(mp.cpu_count()) - + "." - ) - args.n_cores = mp.cpu_count() - - if check_message == "OK": - print(check_message) - - # CHECK FOR EXISTANCE OF INPUT FILES - - if os.path.exists(args.vcf) is not True: - raise Exception("[pixy] ERROR: The specified VCF " + str(args.vcf) + " does not exist") - - if not re.search(".gz", args.vcf): - raise Exception( - '[pixy] ERROR: The vcf is not compressed with bgzip (or has no .gz extension). To fix this, run "bgzip [filename].vcf" first (and then index with "tabix [filename].vcf.gz" if necessary)' - ) - - if not (os.path.exists(args.vcf + ".tbi") or os.path.exists(args.vcf + ".csi")): - raise Exception( - '[pixy] ERROR: The vcf is not indexed. Please either use `tabix` or `bcftools` to produce a `.tbi` or `.csi` index.' - ) - - if os.path.exists(args.populations) is not True: - raise Exception( - "[pixy] ERROR: The specified populations file " - + str(args.populations) - + " does not exist" - ) - - if args.bed_file is not None: - args.bed_file = os.path.expanduser(args.bed_file) - - if os.path.exists(args.bed_file) is not True: - raise Exception( - "[pixy] ERROR: The specified BED file " + str(args.bed_file) + " does not exist" - ) - - else: - bed_df = [] - - if args.sites_file is not None: - args.sites_file = os.path.expanduser(args.sites_file) - - if os.path.exists(args.sites_file) is not True: - raise Exception( - "[pixy] ERROR: The specified sites file " + str(args.sites_file) + " does not exist" - ) - else: - sites_df: pandas.DataFrame = [] - - # VALIDATE THE VCF - - # check if the vcf contains any invariant sites - # a very basic check: just looks for at least one invariant site in the alt field - print("[pixy] Checking for invariant sites...", end="") - check_message = "OK" - - if args.bypass_invariant_check == "no": - alt_list = ( - subprocess.check_output( - "gunzip -c " - + args.vcf - + " | grep -v '#' | head -n 100000 | awk '{print $5}' | sort | uniq", - shell=True, - ) - .decode("utf-8") - .split() - ) - if "." not in alt_list: - raise Exception( - "[pixy] ERROR: the provided VCF appears to contain no invariant sites (ALT = \".\"). This check can be bypassed via --bypass_invariant_check 'yes'." - ) - if "." in alt_list and len(alt_list) == 1: - check_message = "WARNING" - print( - "[pixy] WARNING: the provided VCF appears to contain no variable sites in the first 100 000 sites. It may have been filtered incorrectly, or genetic diversity may be extremely low. This warning can be suppressed via --bypass_invariant_check 'yes'.'" - ) - else: - if not (len(args.stats) == 1 and (args.stats[0] == "fst")): - check_message = "WARNING" - print(check_message) - print( - "[pixy] EXTREME WARNING: --bypass_invariant_check is set to 'yes'. Note that a lack of invariant sites will result in incorrect estimates." - ) - - if check_message == "OK": - print(check_message) - - # check if requested chromosomes exist in vcf - # parses the whole CHROM column (!) - - print("[pixy] Checking chromosome data...", end="") - - # get the list of all chromosomes in the dataset - chrom_all = subprocess.check_output("tabix -l " + args.vcf, shell=True).decode("utf-8").split() - - if args.chromosomes != "all": - chrom_list = list(args.chromosomes.split(",")) - # pretabix method, can remove - # chrom_all = subprocess.check_output("gunzip -c " + args.vcf + " | grep -v '#' | awk '{print $1}' | uniq", shell=True).decode("utf-8").split() - chrom_all = ( - subprocess.check_output("tabix -l " + args.vcf, shell=True).decode("utf-8").split() - ) - missing = list(set(chrom_list) - set(chrom_all)) - if len(missing) > 0: - raise Exception( - "[pixy] ERROR: the following chromosomes were specified but not occur in the VCF: ", - missing, - ) - - else: # added this else statement (klk) - chrom_list = ( - subprocess.check_output("tabix -l " + args.vcf, shell=True).decode("utf-8").split() - ) - chrom_all = chrom_list - - print("OK") - - # INTERVALS - # check if intervals are correctly specified - # validate the BED file (if present) - - print("[pixy] Checking intervals/sites...", end="") - check_message = "OK" - - if args.bed_file is None: - if args.window_size is None: - raise Exception( - "[pixy] ERROR: In the absence of a BED file, a --window_size must be specified." - ) - - if args.interval_start is None and args.interval_end is not None: - raise Exception( - "[pixy] ERROR: When specifying an interval, both --interval_start and --interval_end are required." - ) - - if args.interval_start is not None and args.interval_end is None: - raise Exception( - "[pixy] ERROR: When specifying an interval, both --interval_start and --interval_end are required." - ) - - if (args.interval_start is not None or args.interval_end is not None) and len( - chrom_list - ) > 1: - raise Exception( - "[pixy] ERROR: --interval_start and --interval_end are not valid when calculating over multiple chromosomes. Remove both arguments or specify a single chromosome." - ) - - if (args.interval_start is not None and args.interval_end is not None) and ( - (int(args.interval_end) - int(args.interval_start)) <= int(args.window_size) - ): - check_message = "WARNING" - print( - "[pixy] WARNING: The specified interval " - + str(args.interval_start) - + "-" - + str(args.interval_end) - + " is smaller than the window size (" - + str(args.window_size) - + "). A single window will be returned." - ) - - else: - if ( - args.interval_start is not None - or args.interval_end is not None - or args.window_size is not None - ): - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: --interval_start, --interval_end, and --window_size are not valid when a BED file of windows is provided." - ) - - # read in the bed file and extract the chromosome column - bed_df = pandas.read_csv( - args.bed_file, sep="\t", usecols=[0, 1, 2], names=["chrom", "pos1", "pos2"] - ) - bed_df["chrom"] = bed_df["chrom"].astype(str) - - # force chromosomes to strings - - if bed_df.isnull().values.any(): - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: your bed file contains missing data, confirm all rows have three fields (chrom, pos1, pos2)." - ) - - if len(bed_df.columns) != 3: - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: The bed file has the wrong number of columns (should be 3, is " - + str(len(bed_df.columns)) - + ")" - ) - - else: - bed_df.columns = ["chrom", "chromStart", "chromEnd"] - bed_chrom: List[str] = list(bed_df["chrom"]) - missing = list(set(bed_chrom) - set(chrom_all)) - chrom_all = list(set(chrom_all) & set(bed_chrom)) - chrom_list = list(set(chrom_all) & set(bed_chrom)) - - if len(missing) > 0: - check_message = "WARNING" - print(check_message) - print( - "[pixy] WARNING: the following chromosomes in the BED file do not occur in the VCF and will be ignored: " - + str(missing) - ) - - if args.sites_file is not None: - sites_df = pandas.read_csv( - args.sites_file, sep="\t", usecols=[0, 1], names=["chrom", "pos"] - ) - sites_df["chrom"] = sites_df["chrom"].astype(str) - - if sites_df.isnull().values.any(): - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: your sites file contains missing data, confirm all rows have two fields (chrom, pos)." - ) - - if len(sites_df.columns) != 2: - raise Exception( - "[pixy] ERROR: The sites file has the wrong number of columns (should be 2, is " - + str(len(sites_df.columns)) - + ")" - ) - - else: - sites_df.columns = ["CHROM", "POS"] - chrom_sites = list(sites_df["CHROM"]) - missing = list(set(chrom_sites) - set(chrom_all)) - chrom_list = list(set(chrom_all) & set(chrom_sites)) - - if len(missing) > 0: - check_message = "WARNING" - print(check_message) - print( - "[pixy] WARNING: the following chromosomes in the sites file do not occur in the VCF and will be ignored: " - + str(missing) - ) - - if check_message == "OK": - print(check_message) - - # SAMPLES - # check if requested samples exist in vcf - - print("[pixy] Checking sample data...", end="") - - # - parse + validate the population file - # - format is IND POP (tab separated) - # - throws an error if individuals are missing from VCF - - # read in the list of samples/populations - poppanel = pandas.read_csv( - args.populations, sep="\t", usecols=[0, 1], names=["ID", "Population"] - ) - poppanel["ID"] = poppanel["ID"].astype(str) - - # check for missing values - - if poppanel.isnull().values.any(): - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: your populations file contains missing data, confirm all samples have population IDs (and vice versa)." - ) - - # get a list of samples from the callset - samples_list = vcf_headers.samples - - # make sure every indiv in the pop file is in the VCF callset - IDs = list(poppanel["ID"]) - missing = list(set(IDs) - set(samples_list)) - - # find the samples in the callset index by matching up the order of samples between the population file and the callset - # also check if there are invalid samples in the popfile - try: - samples_callset_index = [samples_list.index(s) for s in poppanel["ID"]] - except ValueError as e: - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: the following samples are listed in the population file but not in the VCF: ", - missing, - ) from e - else: - poppanel["callset_index"] = samples_callset_index - - # use the popindices dictionary to keep track of the indices for each population - popindices = {} - popnames = poppanel.Population.unique() - for name in popnames: - popindices[name] = poppanel[poppanel.Population == name].callset_index.values - - if len(popnames) == 1 and ("fst" in args.stats or "dxy" in args.stats): - check_message = "ERROR" - print(check_message) - raise Exception( - "[pixy] ERROR: calcuation of fst and/or dxy requires at least two populations to be defined in the population file." - ) - - print("OK") - print("[pixy] All initial checks past!") - - return ( - popnames, - popindices, - chrom_list, - IDs, - temp_file, - output_folder, - output_prefix, - bed_df, - sites_df, - ) diff --git a/pixy/enums.py b/pixy/enums.py new file mode 100644 index 0000000..8c26929 --- /dev/null +++ b/pixy/enums.py @@ -0,0 +1,40 @@ +from enum import Enum +from enum import unique + + +@unique +class PixyStat(Enum): + """ + The genetic variance statistics that `pixy` can calculate. + + Attributes: + PI: a measure of genetic diversity _within_ populations + DXY: a measure of genetic diversity _between_ populations + FST: the "fixation index"; represents a subpopulation-specific genetic variance + relative to the total genetic variance + + """ + + PI = "pi" + DXY = "dxy" + FST = "fst" + + def __str__(self) -> str: + return self.value + + +@unique +class FSTEstimator(Enum): + """ + The mutually exclusive FST estimators that `pixy` offers. + + Attributes: + WC: model from Weir and Cockerham 1984 + HUDSON: model from Hudson 1992 and Bhatia et al. 2013 + """ + + WC = "wc" + HUDSON = "hudson" + + def __str__(self) -> str: + return self.value diff --git a/pixy/models.py b/pixy/models.py new file mode 100644 index 0000000..3c916f0 --- /dev/null +++ b/pixy/models.py @@ -0,0 +1,125 @@ +from dataclasses import dataclass +from dataclasses import fields +from typing import Literal +from typing import Union + +from typing_extensions import TypeAlias + +from pixy.args_validation import PixyStat + +NA: TypeAlias = Literal["NA"] + + +@dataclass(frozen=True) +class PixyTempResult: + """ + Stores temporary `pixy` results. + + Currently, `pixy` computes results for all requested statistics ("pi", "dxy", or "fst") + and writes them to the same temporary file before splitting this temp file into the final + per-statistic output files. + + This dataclass represents a row in this temporary file. + + Attributes: + pixy_stat: the genetic variance statistic that `pixy` calculated + (one of "pi", "dxy", or "fst") + population_1: one of two populations being compared (found in the `populations_path` file) + population_2: the other of two populations being compared + chromosome: the chromosome of interest + window_pos_1: the start position of the window in which `pixy` is calculating variance + window_pos_2: the end position of the window in which `pixy` is calculating variance + calculated_stat: the result of the calculation of the `pixy_stat` + shared_sites_with_alleles: the sites at which both populations have alleles + total_differences: the total number of differences between populations summed across all + sites + total_comparisons: the total number of pairwise comparisons between sites + total_missing: equal to (total count of possible pairwise comparisons at all sites + - total_comparisons) + + `population_1` and `population_2` will both be populated for "dxy" and "fst" results. + `population_2` will be "NA" when reporting "pi" results. + + """ + + pixy_stat: PixyStat + population_1: str + population_2: Union[str, NA] + chromosome: str + window_pos_1: int + window_pos_2: int + calculated_stat: Union[float, NA] + shared_sites_with_alleles: int + total_differences: Union[int, float, NA] + total_comparisons: Union[int, float, NA] + total_missing: Union[int, float, NA] + + def __str__(self) -> str: + """Returns a tab-delimited string representation for writing out to the temp file.""" + return "\t".join(str(getattr(self, field.name)) for field in fields(self)) + + +@dataclass +class PiResult: + """A result from calculating pi.""" + + avg_pi: Union[float, NA] + total_diffs: Union[int, NA] + total_comps: Union[int, NA] + total_missing: Union[int, NA] + + @classmethod + def empty(cls) -> "PiResult": + """An empty PiResult, with all fields set to NA.""" + return cls(avg_pi="NA", total_diffs="NA", total_comps="NA", total_missing="NA") + + +@dataclass +class DxyResult: + """A result from calculating dxy.""" + + avg_dxy: Union[float, NA] + total_diffs: Union[int, NA] + total_comps: Union[int, NA] + total_missing: Union[int, NA] + + @classmethod + def empty(cls) -> "DxyResult": + """An empty DxyResult, with all fields set to NA.""" + return cls(avg_dxy="NA", total_diffs="NA", total_comps="NA", total_missing="NA") + + +@dataclass +class FstResult: + """ + A result from calculating fst. + + If `fst_type` is `wc`, the following will be returned: + fst: "NA" if no valid data + a: variance between populations + b: variance between individuals within populations + c: variance between gametes within individuals + n_sites: the number of sites over which variance was measured + + If `fst_type` is `hudson`, the following will be returned: + fst: "NA" if no valid data. + num: divergence between the two populations minus average of diversity within each + population + den: divergence between the two populations + c: a placeholder of 0 to maintain the shape of the return Tuple + n_sites: the number of sites over which variance was measured + + For simplicity and historical consistency, when `fst_type` is `hudson`, the `a` and `b` fields + contain the `num` and `den` values. + """ + + fst: Union[float, NA] + a: Union[float, NA] + b: Union[float, NA] + c: Union[float, int, NA] + n_sites: int + + @classmethod + def empty(cls) -> "FstResult": + """An empty FstResult, with all fields set to NA.""" + return cls(fst="NA", a="NA", b="NA", c="NA", n_sites=0) diff --git a/pixy/stats/__init__.py b/pixy/stats/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pixy/stats/summary.py b/pixy/stats/summary.py new file mode 100644 index 0000000..26fac58 --- /dev/null +++ b/pixy/stats/summary.py @@ -0,0 +1,487 @@ +"""Compute summary statistics.""" + +import argparse +from itertools import combinations +from typing import Dict +from typing import List +from typing import Tuple +from typing import Union +from typing import cast + +import numpy as np +from allel import AlleleCountsArray +from allel import GenotypeArray +from allel import GenotypeVector +from allel import SortedIndex +from allel import windowed_hudson_fst +from allel import windowed_weir_cockerham_fst +from numpy.typing import NDArray + +from pixy.args_validation import PixyStat +from pixy.calc import calc_dxy +from pixy.calc import calc_fst +from pixy.calc import calc_fst_persite +from pixy.calc import calc_pi +from pixy.enums import FSTEstimator +from pixy.models import DxyResult +from pixy.models import FstResult +from pixy.models import PiResult +from pixy.models import PixyTempResult + + +def precompute_filtered_variant_array( + args: argparse.Namespace, + gt_array: GenotypeArray, + pos_array: SortedIndex, + callset_is_none: bool, + window_size: int, + popindices: Dict[str, NDArray[np.int64]], + chromosome: str, +) -> Tuple[List[PixyTempResult], Union[GenotypeArray, None], Union[SortedIndex, None]]: + """ + Pre-compute a filtered array of variants for FST calculation. + + Args: + args: The pixy command-line arguments. + gt_array: Filtered genotypes ingested from the input VCF. + pos_array: Positions of each genotype in `gt_array`. + callset_is_none: True if the input VCF contained no variants in the region of interest. + window_size: The size (in bp) of the window over which to calculate pixy statistics. + popindices: Indices of the population-specific samples within the input VCF. + chromosome: The chromosome of interest. + + Returns: + A tuple (per_site_fst_results, gt_array_fst, pos_array_fst), where `gt_array_fst` and + `pos_array_fst` contain the subset of variants in `gt_array` and `pos_array` which are + biallelic. These objects are `None` if the input VCF contained no variants in the region of + interest (i.e. `callset_is_none` is True or `gt_array` is empty), or when a `populations` + file was not specified in the command-line arguments. + + **Per-site FST results are only computed by this function when `window_size` is set to 1.** + When `window_size=1`, `per_site_fst_results` is a list of per-site FST values for all of the + filtered sites in `gt_array_fst`. Otherwise, it is empty. + """ + gt_array_fst: Union[GenotypeArray, None] + pos_array_fst: Union[SortedIndex, None] + + if (not callset_is_none) and (args.populations is not None) and (len(gt_array) != 0): + # compute allel freqs + allele_counts: AlleleCountsArray = gt_array.count_alleles() + allele_freqs: NDArray[np.float64] = allele_counts.to_frequencies() + + # remove invariant/polyallelic sites + variants_array: List[bool] = [len(x) == 2 and x[0] < 1 for x in allele_freqs] + + # filter gt and position arrays for biallelic variable sites + # NB: we are masking `gt_array` and `pos_array` with the filtered variants. Indexing either + # `GenotypeArray` or `SortedIndex` with a boolean array returns a subset of the initial + # object, but scikit-allel doesn't type this appropriately, so the cast is needed. + gt_array_fst = cast(GenotypeArray, gt_array[variants_array]) + pos_array_fst = cast(SortedIndex, pos_array[variants_array]) + + else: + gt_array_fst = None + pos_array_fst = None + + # if obtaining per-site estimates, + # compute the FST values for the whole chunk + # instead of looping over subwindows (below) + pixy_results: List[PixyTempResult] = [] + + # The original code only computed the per-site FST if the window size were 1 + # TODO extract and conslidate the FST-calculations + # https://github.com/fulcrumgenomics/pixy-dev/issues/47 + if window_size != 1: + return pixy_results, gt_array_fst, pos_array_fst + + # determine all the possible population pairings + pop_names: List[str] = list(popindices.keys()) + fst_pop_list: List[Tuple[str, str]] = list(combinations(pop_names, 2)) + + # for each pair, compute fst using the filtered gt_array + for pop_pair in fst_pop_list: + # the indices for the individuals in each population + fst_pop_indicies: List[List[int]] = [ + popindices[pop_pair[0]].tolist(), + popindices[pop_pair[1]].tolist(), + ] + + # compute FST + # windowed_weir_cockerham_fst seems to generate (spurious?) warnings about div/0, so + # suppressing warnings (this assumes that the scikit-allel function is working as + # intended) + # TODO: verify these are indeed spurious + # https://github.com/fulcrumgenomics/pixy-dev/issues/48 + np.seterr(divide="ignore", invalid="ignore") + + # if the genotype matrix is not empty, compute FST + # otherwise return NA + if not callset_is_none and gt_array_fst is not None and len(gt_array_fst) > 0: + assert isinstance(pos_array_fst, SortedIndex), ( + "if gt_array_fst is not None, pos_array_fst should be not None as well" + ) + + per_site_fsts: NDArray[np.float64] = calc_fst_persite( + gt_array_fst, fst_pop_indicies, args.fst_type + ) + assert gt_array_fst.shape[0] == pos_array_fst.shape[0] == per_site_fsts.shape[0], ( + "the genotype, position, and FST arrays should have the same length" + ) + + snps: int = 1 + + for i, fst in enumerate(per_site_fsts): + # append trailing NAs so that pi/dxy/fst have the same # of columns + pixy_result: PixyTempResult = PixyTempResult( + pixy_stat=PixyStat.FST, + population_1=pop_pair[0], + population_2=pop_pair[1], + chromosome=chromosome, + window_pos_1=pos_array_fst[i], + window_pos_2=pos_array_fst[i], + calculated_stat=fst, + shared_sites_with_alleles=snps, + total_differences="NA", + total_missing="NA", + total_comparisons="NA", + ) + + pixy_results.append(pixy_result) + + return pixy_results, gt_array_fst, pos_array_fst + + +def compute_summary_pi( + popnames: NDArray[np.string_], + window_is_empty: bool, + gt_region: Union[GenotypeVector, None], + popindices: Dict[str, NDArray[np.int64]], + chromosome: str, + window_pos_1: int, + window_pos_2: int, +) -> List[PixyTempResult]: + """Compute pi for all populations in the specified window.""" + pixy_results: List[PixyTempResult] = [] + + for pop in popnames: + pi_result: PiResult + no_sites: int + # if the window has no sites in the VCF, assign all NAs, + # otherwise calculate pi + if window_is_empty: + pi_result = PiResult.empty() + no_sites = 0 + else: + # NB: in the original implementation this branch appears to be unreachable when + # `gt_region` is None, the value error is included as a type narrower + if gt_region is None: + raise ValueError("gt_region is None") + + # subset the window for the individuals in each population + gt_pop = gt_region.take(popindices[pop], axis=1) + + # if the population specific window for this region is empty, report it as such + if len(gt_pop) == 0: + pi_result = PiResult.empty() + no_sites = 0 + + # otherwise compute pi as normal + else: + # number of sites genotyped in the population + # not directly used in the calculation + no_sites = np.count_nonzero(np.sum(gt_pop.count_alleles(max_allele=1), 1)) + pi_result = calc_pi(gt_pop) + + # create a `PixyTempResult` composed of pi results to write to file + pixy_result: PixyTempResult = PixyTempResult( + pixy_stat=PixyStat.PI, + population_1=pop, + population_2="NA", + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + calculated_stat=pi_result.avg_pi, + shared_sites_with_alleles=no_sites, + total_differences=pi_result.total_diffs, + total_comparisons=pi_result.total_comps, + total_missing=pi_result.total_missing, + ) + + # append the result to list of `PixyTempResult` objects + pixy_results.append(pixy_result) + + return pixy_results + + +def compute_summary_dxy( + popnames: NDArray[np.string_], + window_is_empty: bool, + gt_region: Union[GenotypeVector, None], + popindices: Dict[str, NDArray[np.int64]], + chromosome: str, + window_pos_1: int, + window_pos_2: int, +) -> List[PixyTempResult]: + """Compute dxy for all pairwise combinations of populations in the specified window.""" + pixy_results: List[PixyTempResult] = [] + + dxy_pop_list = list(combinations(popnames, 2)) + + # iterate over all population pairs and compute dxy + for pop_pair in dxy_pop_list: + pop1 = pop_pair[0] + pop2 = pop_pair[1] + + dxy_result: DxyResult + no_sites: int + if window_is_empty: + dxy_result = DxyResult.empty() + no_sites = 0 + + else: + # NB: in the original implementation this branch appears to be unreachable when + # `gt_region` is None, the value error is included as a type narrower + if gt_region is None: + raise ValueError("gt_region is None") + + # use the pop_gts dictionary to keep track of this region's GTs for each population + pop_gts = {} + for name in pop_pair: + gt_pop = gt_region.take(popindices[name], axis=1) + pop_gts[name] = gt_pop + + pop1_gt_region = pop_gts[pop1] + pop2_gt_region = pop_gts[pop2] + + # if either of the two population-specific windows for this region are empty, + # report it missing + if len(pop1_gt_region) == 0 or len(pop2_gt_region) == 0: + dxy_result = DxyResult.empty() + no_sites = 0 + + # otherwise compute dxy as normal + else: + # for number of sites (not used in calculation), report the + # number of sites that have at least one genotype in BOTH populations + pop1_sites = np.sum(pop1_gt_region.count_alleles(max_allele=1), 1) > 0 + pop2_sites = np.sum(pop2_gt_region.count_alleles(max_allele=1), 1) > 0 + no_sites = np.sum(np.logical_and(pop1_sites, pop2_sites)) + dxy_result = calc_dxy(pop1_gt_array=pop1_gt_region, pop2_gt_array=pop2_gt_region) + + # create a string of for the dxy results + pixy_result: PixyTempResult = PixyTempResult( + pixy_stat=PixyStat.DXY, + population_1=pop1, + population_2=pop2, + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + calculated_stat=dxy_result.avg_dxy, + shared_sites_with_alleles=no_sites, + total_differences=dxy_result.total_diffs, + total_comparisons=dxy_result.total_comps, + total_missing=dxy_result.total_missing, + ) + + # append the result to list of `PixyTempResult` objects + pixy_results.append(pixy_result) + + return pixy_results + + +def compute_summary_fst( + fst_type: FSTEstimator, + gt_array_fst: GenotypeArray, + pos_array_fst: Union[SortedIndex, None], + window_is_empty: bool, + callset_is_none: bool, + aggregate: bool, + popindices: Dict[str, NDArray[np.int64]], + chromosome: str, + window_pos_1: int, + window_pos_2: int, +) -> List[PixyTempResult]: + """Compute fst for all filtered sites.""" + pixy_results: List[PixyTempResult] = [] + + # If there are no valid sites, exit early + if pos_array_fst is None: + return pixy_results + + # If there are no valid sites in the current window, exit early + if not np.logical_and(pos_array_fst >= window_pos_1, pos_array_fst <= window_pos_2).any(): + return pixy_results + + # if there are valid sites, determine all the possible population pairings + pop_names = list(popindices.keys()) + fst_pop_list = list(combinations(pop_names, 2)) + + # for each pair, compute fst using the filtered gt_array + for pop_pair in fst_pop_list: + # the indices for the individuals in each population + fst_pop_indicies = [ + popindices[pop_pair[0]].tolist(), + popindices[pop_pair[1]].tolist(), + ] + + # compute FST + # windowed_weir_cockerham_fst seems to generate (spurious?) warnings about div/0, so + # suppressing warnings (this assumes that the scikit-allel function is working as + # intended) + # TODO: verify these are indeed spurious + # https://github.com/fulcrumgenomics/pixy-dev/issues/48 + np.seterr(divide="ignore", invalid="ignore") + + # if the genotype matrix is not empty, compute FST + # other wise return NA + gt_matrix_is_empty: bool = not ( + not callset_is_none + and gt_array_fst is not None + and len(gt_array_fst) > 0 + and not window_is_empty + ) + + fst_results: List[PixyTempResult] + if aggregate: + fst_results = _compute_aggregate_fst_for_pair( + gt_matrix_is_empty=gt_matrix_is_empty, + fst_type=fst_type, + gt_array_fst=gt_array_fst, + fst_pop_indicies=fst_pop_indicies, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + pop_pair=pop_pair, + chromosome=chromosome, + ) + else: + fst_results = _compute_individual_fst_for_pair( + gt_matrix_is_empty=gt_matrix_is_empty, + fst_type=fst_type, + pos_array_fst=pos_array_fst, + gt_array_fst=gt_array_fst, + fst_pop_indicies=fst_pop_indicies, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + pop_pair=pop_pair, + chromosome=chromosome, + ) + + pixy_results.extend(fst_results) + + return pixy_results + + +def _compute_aggregate_fst_for_pair( + gt_matrix_is_empty: bool, + fst_type: FSTEstimator, + gt_array_fst: GenotypeArray, + fst_pop_indicies: List[List[int]], + window_pos_1: int, + window_pos_2: int, + pop_pair: Tuple[str, str], + chromosome: str, +) -> List[PixyTempResult]: + """Compute aggregate FST for a pair of populations.""" + result: FstResult + if gt_matrix_is_empty: + result = FstResult.empty() + else: + result = calc_fst(gt_array_fst, fst_pop_indicies, fst_type) + + pixy_result = PixyTempResult( + pixy_stat=PixyStat.FST, + population_1=pop_pair[0], + population_2=pop_pair[1], + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + calculated_stat=result.fst, + shared_sites_with_alleles=result.n_sites, + total_differences=result.a, + total_comparisons=result.b, + total_missing=result.c, + ) + + pixy_results = [pixy_result] + + return pixy_results + + +def _compute_individual_fst_for_pair( + gt_matrix_is_empty: bool, + fst_type: FSTEstimator, + pos_array_fst: SortedIndex, + gt_array_fst: GenotypeArray, + fst_pop_indicies: List[List[int]], + window_pos_1: int, + window_pos_2: int, + pop_pair: Tuple[str, str], + chromosome: str, +) -> List[PixyTempResult]: + """Compte individual FST for a pair of populations.""" + if gt_matrix_is_empty: + pixy_result = PixyTempResult( + pixy_stat=PixyStat.FST, + population_1=pop_pair[0], + population_2=pop_pair[1], + chromosome=chromosome, + window_pos_1=window_pos_1, + window_pos_2=window_pos_2, + calculated_stat="NA", + shared_sites_with_alleles=0, + total_differences="NA", + total_comparisons="NA", + total_missing="NA", + ) + + return [pixy_result] + + pixy_results: List[PixyTempResult] = [] + + # compute an ad-hoc window size + fst_window_size = window_pos_2 - window_pos_1 + + if fst_type is FSTEstimator.WC: + fst, window_positions, n_snps = windowed_weir_cockerham_fst( + pos_array_fst, + gt_array_fst, + subpops=fst_pop_indicies, + size=fst_window_size, + start=window_pos_1, + stop=window_pos_2, + ) + + elif fst_type is FSTEstimator.HUDSON: + ac1 = gt_array_fst.count_alleles(subpop=fst_pop_indicies[0]) + ac2 = gt_array_fst.count_alleles(subpop=fst_pop_indicies[1]) + fst, window_positions, n_snps = windowed_hudson_fst( + pos_array_fst, + ac1, + ac2, + size=fst_window_size, + start=window_pos_1, + stop=window_pos_2, + ) + + else: + raise ValueError("unreachable") + + for fst_stat, wind, snps in zip(fst, window_positions, n_snps): + # append trailing NAs so that pi/dxy/fst have the same # of columns + pixy_result = PixyTempResult( + pixy_stat=PixyStat.FST, + population_1=pop_pair[0], + population_2=pop_pair[1], + chromosome=chromosome, + window_pos_1=wind[0], + window_pos_2=wind[1], + calculated_stat=fst_stat, + shared_sites_with_alleles=snps, + total_differences="NA", + total_comparisons="NA", + total_missing="NA", + ) + + pixy_results.append(pixy_result) + + return pixy_results diff --git a/pyproject.toml b/pyproject.toml index c218eb8..2f6050c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,6 +145,7 @@ ignore = [ "D212", # summary line should be located on the same line as opening quotes "D100", # Missing docstring in public module "D104", # Missing docstring in public package + "D105", # Missing docstring in magic method ] unfixable = ["B"] diff --git a/tests/conftest.py b/tests/conftest.py index 829d446..70e0037 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -185,14 +185,23 @@ def run_pixy_helper( # noqa: C901 if sites_path is not None: test_args.extend(["--sites_file", f"{sites_path}"]) + # NB: we deliberately permit this helper to run with any type of args for the next 3 flags + # in order to test that `pixy` correctly raises an error in this case. + if bed_path is not None: test_args.extend(["--bed_file", f"{bed_path}"]) - # NB: we deliberately permit this helper to run with only one of the following - # two flags specified, in order to test that `pixy` correctly raises an error in this case. - if interval_start is not None: - test_args.extend((["--interval_start", f"{interval_start}"])) - if interval_end is not None: - test_args.extend((["--interval_end", f"{interval_end}"])) + else: + test_args.extend(["--bed_file"]) + + if interval_start is not None: + test_args.extend((["--interval_start", f"{interval_start}"])) + else: + test_args.extend((["--interval_start"])) + + if interval_end is not None: + test_args.extend((["--interval_end", f"{interval_end}"])) + else: + test_args.extend((["--interval_end"])) if output_prefix is not None: test_args.extend((["--output_prefix", f"{output_prefix}"])) @@ -205,7 +214,7 @@ def run_pixy_helper( # noqa: C901 if fst_type is not None: test_args.extend((["--fst_type", f"{fst_type}"])) - + print(f"test_args: {test_args}") with patch.object(sys, "argv", test_args): main() diff --git a/tests/main/test_main.py b/tests/main/test_main.py index 4c9d734..6ddcdd2 100644 --- a/tests/main/test_main.py +++ b/tests/main/test_main.py @@ -1,4 +1,5 @@ import filecmp +import logging import os import shutil from pathlib import Path @@ -17,50 +18,64 @@ @pytest.mark.parametrize( - "bed_path, window_size, interval_start, interval_stop, expected_error_msg", + "bed_str, window_size, chromosomes, interval_start, interval_end, expected_error_msg", [ ( "test_bed_path", 10000, + "all", None, None, - "ERROR: --interval_start, --interval_end, and --window_size", + "--interval_start, --interval_end, and --window_size", ), # too many args: bed_path and window_size ( "test_bed_path", 10000, + "all", 1, 20, - "ERROR: --interval_start, --interval_end, and --window_size", + "--interval_start, --interval_end, and --window_size", ), # too many args: bed_path, window_size, interval_start, and interval_stop ( None, None, + "all", None, None, - "ERROR: In the absence of a BED file", + "In the absence of a BED file", ), # no bed file and no window_size ( None, + 1, + "all", + 1, + 1, + "--interval_start and --interval_end are not valid when calculating over multiple", + ), # can't specify an interval over multiple chromosomes + ( None, 1, + "X", + 1, None, - "ERROR: When specifying an interval,", + "When specifying an interval,", ), # too few args: only `interval_start` provided ( None, - None, + 1, + "X", None, 1, - "ERROR: When specifying an interval,", + "When specifying an interval,", ), # too few args: only `interval_end` provided ], ) def test_missing_or_conflicting_args( - bed_path: str, + bed_str: Optional[str], window_size: Optional[int], + chromosomes: str, interval_start: Optional[int], - interval_stop: Optional[int], + interval_end: Optional[int], expected_error_msg: str, pixy_out_dir: Path, ag1000_vcf_path: Path, @@ -72,13 +87,18 @@ def test_missing_or_conflicting_args( `vcf_path` and `populations_path` stay the same here and are tested separately elsewhere. """ - with pytest.raises(Exception, match=expected_error_msg): + if bed_str is not None: + bed_path = request.getfixturevalue(bed_str) + else: + bed_path = None + with pytest.raises(ValueError, match=expected_error_msg): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], - bed_path=request.getfixturevalue(bed_path), + bed_path=bed_path, + chromosomes=chromosomes, interval_start=interval_start, - interval_end=interval_stop, + interval_end=interval_end, window_size=window_size, vcf_path=ag1000_vcf_path, populations_path=ag1000_pop_path, @@ -94,7 +114,7 @@ def test_vcf_missing_index( """Assert that we raise an exception when missing .tbi index.""" missing_index_vcf_path: Path = tmp_path / "ag1000_pixy_test.vcf.gz" shutil.copy(ag1000_vcf_path, missing_index_vcf_path) - with pytest.raises(Exception, match="ERROR: The vcf is not indexed."): + with pytest.raises(ValueError, match="The vcf is not indexed."): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -108,8 +128,8 @@ def test_missing_tabix_path_raises( pixy_out_dir: Path, ag1000_pop_path: Path, ag1000_vcf_path: Path ) -> None: """Assert that we raise an exception when `tabix` is not on the path.""" - with patch("pixy.core.shutil.which", return_value=None): - with pytest.raises(Exception, match="ERROR: tabix is not installed"): + with patch("pixy.args_validation.shutil.which", return_value=None): + with pytest.raises(ValueError, match="`tabix` is not installed"): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -126,7 +146,7 @@ def test_missing_vcf_file_raises(tmp_path: Path, pixy_out_dir: Path, ag1000_pop_ with open(vcf_path, "w") as f: f.write("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\t\\INFO\tFORMAT") - with pytest.raises(Exception, match="ERROR: The vcf is not compressed"): + with pytest.raises(ValueError, match="The vcf is not compressed"): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -138,7 +158,7 @@ def test_missing_vcf_file_raises(tmp_path: Path, pixy_out_dir: Path, ag1000_pop_ def test_missing_pop_file_raises(pixy_out_dir: Path, ag1000_vcf_path: Path) -> None: """Assert that we raise an exception with a missing `populations_path`.""" - with pytest.raises(Exception, match="ERROR: The specified populations file"): + with pytest.raises(FileNotFoundError, match="The specified populations file"): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -152,7 +172,7 @@ def test_missing_chroms_in_vcf_raises( pixy_out_dir: Path, ag1000_vcf_path: Path, ag1000_pop_path: Path ) -> None: """Assert that we raise an exception when a given chromosome subset is not found in the VCF.""" - with pytest.raises(Exception, match="ERROR: the following chromosomes"): + with pytest.raises(ValueError, match="The following chromosomes"): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -170,7 +190,7 @@ def test_vcf_no_invariant_sites_raises( ag1000_invariant_vcf_path: Path, ) -> None: """Assert that we raise an error when a VCF contains no variable sites.""" - with pytest.raises(Exception, match="ERROR: the provided VCF appears to contain no invariant"): + with pytest.raises(ValueError, match="The provided VCF appears to contain no invariant"): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -190,15 +210,15 @@ def test_vcf_no_invariant_sites_raises( [ ( """ERS224670\tBFS\nERS224248\tBFS\nERS224089\t""", - "ERROR: your populations file contains missing data", + "The specified populations file contains missing data", ), # row missing 2nd column ( """ERS224670\tBFS\nERS224248\tBFS\n""", - "ERROR: calcuation of fst and/or dxy requires at least two", - ), # only 1 population here; `pixy` requires 2 populations for calcuation (sic) + "Calculation of fst and/or dxy requires at least two", + ), # only 1 population here; `pixy` requires 2 populations for calculation ( """INVALID_SAMPLE_ID_001\tXFS\nINVALID_SAMPLE_ID_002\tXFS\n""", - "ERROR: the following samples are listed", + "The following samples are listed", ), # samples in populations.txt are not in VCF ], ) @@ -213,7 +233,7 @@ def test_malformed_populations_files_raises( malformed_pop_path: Path = tmp_path / "malformed_pop_file.txt" with open(malformed_pop_path, "w") as f: f.write(malformed_populations_input) - with pytest.raises(Exception, match=expected_error_msg): + with pytest.raises(ValueError, match=expected_error_msg): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -228,7 +248,11 @@ def test_malformed_populations_files_raises( [ ( """X\t1\nX\t2\nX\t""", - "ERROR: your sites file contains missing data", + "The specified sites file contains missing data", + ), # row missing position + ( + """X\n1\n2\n3\n""", + "Too many columns specified: expected 2 and found 1", ), # row missing position ], ) @@ -248,8 +272,7 @@ def test_malformed_sites_file( malformed_site_path: Path = tmp_path / "malformed_sites_file.txt" with open(malformed_site_path, "w") as f: f.write(malformed_sites_input) - - with pytest.raises(Exception, match=expected_error_msg): + with pytest.raises(ValueError, match=expected_error_msg): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -265,7 +288,7 @@ def test_malformed_sites_file( [ ( """X\t1\t20\nX\t2\n""", - "ERROR: your bed file contains missing data", + "The specified BED file contains missing data", ), # row missing position ], ) @@ -286,7 +309,7 @@ def test_malformed_bed_file( with open(bed_path, "w") as f: f.write(malformed_bed_input) - with pytest.raises(Exception, match=expected_error_msg): + with pytest.raises(ValueError, match=expected_error_msg): run_pixy_helper( pixy_out_dir=pixy_out_dir, stats=["pi", "fst", "dxy"], @@ -306,12 +329,12 @@ def test_vcf_bed_chrom_difference_warns( pixy_out_dir: Path, ag1000_pop_path: Path, ag1000_vcf_path: Path, - capsys: pytest.CaptureFixture, + caplog: pytest.LogCaptureFixture, ) -> None: """Assert that chromosomes in .bed but not .vcf yields a warning.""" temp_dir = tmp_path_factory.mktemp("warn_files") bed_path: Path = temp_dir / "not_in_vcf.bed" - + caplog.set_level(logging.WARNING) with open(bed_path, "w") as f: f.write("""X\t1\t20\nX\t2\t30\n""") # X is in the VCF f.write("""23\t1\t20\n23\t2\t30\n""") # 23 is not in the VCF @@ -323,16 +346,17 @@ def test_vcf_bed_chrom_difference_warns( vcf_path=ag1000_vcf_path, populations_path=ag1000_pop_path, ) - out, _ = capsys.readouterr() - - assert "WARNING: the following chromosomes in the BED file" in out + assert ( + "The following chromosomes are in the BED file but do not occur in the VCF " + "and will be ignored: ['23']" in caplog.messages + ) def test_bypass_invariant_check_warns( pixy_out_dir: Path, ag1000_pop_path: Path, ag1000_vcf_path: Path, - capsys: pytest.CaptureFixture, + caplog: pytest.LogCaptureFixture, ) -> None: """Assert that `--bypass_invariant_check flag yields a warning.""" run_pixy_helper( @@ -343,8 +367,7 @@ def test_bypass_invariant_check_warns( populations_path=ag1000_pop_path, bypass_invariant_check="yes", ) - out, _ = capsys.readouterr() - assert "EXTREME WARNING: --bypass_invariant_check is set to 'yes'" in out + assert "EXTREME WARNING: --bypass_invariant_check is set to 'yes'" in caplog.text ################################################################################ @@ -598,7 +621,7 @@ def test_pixy_limited_sites_bed( expected_outputs: Path, ag1000_pop_path: Path, ag1000_vcf_path: Path, - capsys: pytest.CaptureFixture, + caplog: pytest.LogCaptureFixture, ) -> None: """ Test that `pixy` produces deterministic stats given static input data. @@ -616,13 +639,14 @@ def test_pixy_limited_sites_bed( debug=True, cores=1, ) - out, _ = capsys.readouterr() + expected_warnings: List[str] = [ + "The following chromosomes are in the BED file but do not occur in the VCF " + "and will be ignored: ['5']", + "The following chromosomes occur in the sites file but do not occur in the VCF " + "and will be ignored: ['B']", + ] - expected_warning: str = ( - "WARNING: the following chromosomes in the sites file do not occur in the VCF and will be " - "ignored: ['B']" - ) - assert expected_warning in out + assert set(expected_warnings) == set(caplog.messages) expected_out_files: List[Path] = [ Path("limited_sites_and_bed_dxy.txt"), diff --git a/tests/stats/test_models.py b/tests/stats/test_models.py new file mode 100644 index 0000000..51fe27a --- /dev/null +++ b/tests/stats/test_models.py @@ -0,0 +1,114 @@ +from typing import Union + +import pytest + +from pixy.args_validation import PixyStat +from pixy.models import NA +from pixy.models import PixyTempResult + + +@pytest.mark.parametrize( + "pixy_stat, pop1, pop2, chr, pos_1, pos_2, calculated_stat, shared, diffs, comps, missing," + "expected_str", + [ + ( + PixyStat.DXY, + "pop1", + "pop2", + "chr1", + 100, + 200, + 0.5, + 50, + 10, + 20, + 5, + "dxy\tpop1\tpop2\tchr1\t100\t200\t0.5\t50\t10\t20\t5", + ), # no `None` -> no NA expected + ( + PixyStat.FST.value, + "pop1", + "pop2", + "chr1", + 100, + 200, + 0.5, + 50, + "NA", + 20, + 5, + "fst\tpop1\tpop2\tchr1\t100\t200\t0.5\t50\tNA\t20\t5", + ), # `total_differences` is `None` + ( + PixyStat.FST, + "pop1", + "pop2", + "chr1", + 100, + 200, + 0.5, + 50, + 10, + "NA", + 5, + "fst\tpop1\tpop2\tchr1\t100\t200\t0.5\t50\t10\tNA\t5", + ), # `total_comparisons` is `None` + ( + PixyStat.FST, + "pop1", + "pop2", + "chr1", + 100, + 200, + 0.5, + 50, + 10, + 20, + "NA", + "fst\tpop1\tpop2\tchr1\t100\t200\t0.5\t50\t10\t20\tNA", + ), # `total_missing` is `None` + ( + PixyStat.PI, + "pop1", + "NA", + "chr1", + 100, + 200, + 0.5, + 50, + 10, + 20, + 5, + "pi\tpop1\tNA\tchr1\t100\t200\t0.5\t50\t10\t20\t5", + ), # `population_2` is `None` + ], +) +def test_pixy_result_str( + pixy_stat: PixyStat, + pop1: str, + pop2: str, + chr: str, + pos_1: int, + pos_2: int, + calculated_stat: float, + shared: int, + diffs: Union[int, NA], + comps: Union[int, NA], + missing: Union[int, NA], + expected_str: str, +) -> None: + """Assert that PixyResult.__str__() produces well-formed output.""" + generated_result: PixyTempResult = PixyTempResult( + pixy_stat=pixy_stat, + population_1=pop1, + population_2=pop2, + chromosome=chr, + window_pos_1=pos_1, + window_pos_2=pos_2, + calculated_stat=calculated_stat, + shared_sites_with_alleles=shared, + total_differences=diffs, + total_comparisons=comps, + total_missing=missing, + ) + assert str(generated_result) == expected_str