From a798e102434afc26567115b96c642559a66b75b9 Mon Sep 17 00:00:00 2001 From: Clint Valentine Date: Fri, 17 May 2024 13:43:21 -0700 Subject: [PATCH] feat: add a type-savvy .from_path() to BedWriter and BedReader (#17) --- README.md | 10 +-- bedspec/__init__.py | 1 + bedspec/_bedspec.py | 156 ++++++++++++++++++++++++++++++++++-------- tests/test_bedspec.py | 118 ++++++++++++++++++++++++++++++-- 4 files changed, 247 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 7b6880c..c103ec9 100644 --- a/README.md +++ b/README.md @@ -22,20 +22,22 @@ pip install bedspec ### Writing ```python -from bedspec import BedWriter, Bed3 +from bedspec import Bed3 +from bedspec import BedWriter bed = Bed3("chr1", start=2, end=8) -with BedWriter(open("test.bed", "w")) as writer: +with BedWriter[Bed3].from_path("test.bed") as writer: writer.write(bed) ``` ### Reading ```python -from bedspec import BedReader, Bed3 +from bedspec import Bed3 +from bedspec import BedReader -with BedReader[Bed3](open("test.bed")) as reader: +with BedReader[Bed3].from_path("test.bed") as reader: for bed in reader: print(bed) ``` diff --git a/bedspec/__init__.py b/bedspec/__init__.py index bdc8e23..f6525f5 100644 --- a/bedspec/__init__.py +++ b/bedspec/__init__.py @@ -12,6 +12,7 @@ from ._bedspec import BedStrand from ._bedspec import BedType from ._bedspec import BedWriter +from ._bedspec import Locatable from ._bedspec import PairBed from ._bedspec import PointBed from ._bedspec import SimpleBed diff --git a/bedspec/_bedspec.py b/bedspec/_bedspec.py index 42dbe5c..dda6013 100644 --- a/bedspec/_bedspec.py +++ b/bedspec/_bedspec.py @@ -1,7 +1,6 @@ import dataclasses import inspect import io -import typing from abc import ABC from abc import abstractmethod from dataclasses import asdict as as_dict @@ -9,18 +8,29 @@ from dataclasses import fields from enum import StrEnum from enum import unique +from functools import update_wrapper from pathlib import Path from types import FrameType from types import TracebackType +from types import UnionType from typing import Any +from typing import Callable from typing import ClassVar from typing import ContextManager from typing import Generic from typing import Iterable from typing import Iterator from typing import Protocol +from typing import Type from typing import TypeVar +from typing import Union +from typing import _BaseGenericAlias # type: ignore[attr-defined] +from typing import _GenericAlias # type: ignore[attr-defined] from typing import cast +from typing import get_args +from typing import get_origin +from typing import get_type_hints +from typing import runtime_checkable COMMENT_PREFIXES: set[str] = {"#", "browser", "track"} """The set of BED comment prefixes supported by this implementation.""" @@ -28,6 +38,68 @@ MISSING_FIELD: str = "." """The string used to indicate a missing field in a BED record.""" +BED_EXTENSION: str = ".bed" +"""The specification defined file extension for BED files.""" + +BEDPE_EXTENSION: str = ".bedpe" +"""The specification defined file extension for BedPE files.""" + + +def is_union(annotation: Type) -> bool: + """Test if we have a union type annotation or not.""" + return get_origin(annotation) in {Union, UnionType} + + +def is_optional(annotation: Type) -> bool: + """Return if this type annotation is optional (a union type with None) or not.""" + return is_union(annotation) and type(None) in get_args(annotation) + + +def singular_non_optional_type(annotation: Type) -> Type: + """Return the non-optional version of a singular type annotation.""" + if not is_optional(annotation): + return annotation + + not_none: list[Type] = [arg for arg in get_args(annotation) if arg is not type(None)] + if len(not_none) == 1: + return not_none[0] + else: + raise TypeError(f"Complex non-optional types are not supported! Found: {not_none}") + + +class MethodType: + def __init__(self, func: Callable, obj: object) -> None: + self.__func__ = func + self.__self__ = obj + + def __call__(self, *args: object, **kwargs: object) -> object: + func = self.__func__ + obj = self.__self__ + return func(obj, *args, **kwargs) + + +class classmethod_generic: + def __init__(self, f: Callable) -> None: + self.f = f + update_wrapper(self, f) + + def __get__(self, obj: object, cls: object | None = None) -> Callable: + if cls is None: + cls = type(obj) + method = MethodType(self.f, cls) + method._generic_classmethod = True # type: ignore[attr-defined] + return method + + +def __getattr__(self: object, name: str | None = None) -> object: + if hasattr(obj := orig_getattr(self, name), "_generic_classmethod"): + obj.__self__ = self + return obj + + +orig_getattr = _BaseGenericAlias.__getattr__ +_BaseGenericAlias.__getattr__ = __getattr__ + @unique class BedStrand(StrEnum): @@ -35,29 +107,37 @@ class BedStrand(StrEnum): POSITIVE = "+" NEGATIVE = "-" - UNKNOWN = MISSING_FIELD + + def opposite(self) -> "BedStrand": + """Return the opposite strand.""" + match self: + case BedStrand.POSITIVE: + return BedStrand.NEGATIVE + case BedStrand.NEGATIVE: + return BedStrand.POSITIVE +@dataclass class BedColor: """The color of a BED record in red, green, and blue values.""" - def __init__(self, r: int, g: int, b: int): - """Build a new BED color from red, green, and blue values.""" - self.r = r - self.g = g - self.b = b + r: int + g: int + b: int def __str__(self) -> str: """Return a string representation of this BED color.""" return f"{self.r},{self.g},{self.b}" +@runtime_checkable class DataclassProtocol(Protocol): """A protocol for objects that are dataclass instances.""" __dataclass_fields__: ClassVar[dict[str, Any]] +@runtime_checkable class Locatable(Protocol): """A protocol for 0-based half-open objects located on a reference sequence.""" @@ -66,16 +146,17 @@ class Locatable(Protocol): end: int +@runtime_checkable class Stranded(Protocol): """A protocol for stranded BED types.""" - strand: BedStrand + strand: BedStrand | None class BedType(ABC, DataclassProtocol): """An abstract base class for all types of BED records.""" - def __new__(cls, *args: Any, **kwargs: Any) -> "BedType": + def __new__(cls, *args: object, **kwargs: object) -> "BedType": if not dataclasses.is_dataclass(cls): raise TypeError("You must mark custom BED records with @dataclass!") return cast("BedType", object.__new__(cls)) @@ -84,7 +165,7 @@ def __new__(cls, *args: Any, **kwargs: Any) -> "BedType": def decode(cls, line: str) -> "BedType": """Decode a line of text into a BED record.""" row: list[str] = line.strip().split() - coerced: dict[str, Any] = {} + coerced: dict[str, object] = {} try: zipped = list(zip(fields(cls), row, strict=True)) @@ -94,9 +175,14 @@ def decode(cls, line: str) -> "BedType": f" '{' '.join(row)}'" ) from None + hints: dict[str, Type] = get_type_hints(cls) + for field, value in zipped: try: - coerced[field.name] = field.type(value) + if is_optional(hints[field.name]) and value == MISSING_FIELD: + coerced[field.name] = None + else: + coerced[field.name] = singular_non_optional_type(field.type)(value) except ValueError: raise TypeError( f"Tried to build the BED field '{field.name}' (of type '{field.type.__name__}')" @@ -117,6 +203,7 @@ class PointBed(BedType, ABC): contig: str start: int + @property def length(self) -> int: """The length of this record.""" return 1 @@ -138,6 +225,7 @@ def __post_init__(self) -> None: if self.start >= self.end or self.start < 0: raise ValueError("start must be greater than 0 and less than end!") + @property def length(self) -> int: """The length of this record.""" return self.end - self.start @@ -204,7 +292,7 @@ class Bed4(SimpleBed): contig: str start: int end: int - name: str + name: str | None @dataclass @@ -214,8 +302,8 @@ class Bed5(SimpleBed): contig: str start: int end: int - name: str - score: int + name: str | None + score: int | None @dataclass @@ -225,9 +313,9 @@ class Bed6(SimpleBed, Stranded): contig: str start: int end: int - name: str - score: int - strand: BedStrand + name: str | None + score: int | None + strand: BedStrand | None # @dataclass @@ -260,10 +348,10 @@ class BedPE(PairBed): contig2: str start2: int end2: int - name: str - score: int - strand1: BedStrand - strand2: BedStrand + name: str | None + score: int | None + strand1: BedStrand | None + strand2: BedStrand | None @property def bed1(self) -> Bed6: @@ -306,9 +394,9 @@ class BedWriter(Generic[BedKind], ContextManager): bed_kind: type[BedKind] | None - def __class_getitem__(cls, key: Any) -> type: + def __class_getitem__(cls, key: object) -> type: """Wrap all objects of this class to become generic aliases.""" - return typing._GenericAlias(cls, key) # type: ignore[attr-defined,no-any-return] + return _GenericAlias(cls, key) # type: ignore[no-any-return] def __new__(cls, handle: io.TextIOWrapper) -> "BedWriter[BedKind]": """Bind the kind of BED type to this class for later introspection.""" @@ -337,6 +425,13 @@ def __exit__( self.close() return super().__exit__(__exc_type, __exc_value, __traceback) + @classmethod_generic + def from_path(cls, path: Path | str) -> "BedWriter[BedKind]": + """Open a BED reader from a file path.""" + reader = cls(handle=Path(path).open("w")) # type: ignore[operator] + reader.bed_kind = None if len(cls.__args__) == 0 else cls.__args__[0] # type: ignore[attr-defined] + return cast("BedWriter[BedKind]", reader) + def close(self) -> None: """Close the underlying IO handle.""" self._handle.close() @@ -390,9 +485,9 @@ class BedReader(Generic[BedKind], ContextManager, Iterable[BedKind]): bed_kind: type[BedKind] | None - def __class_getitem__(cls, key: Any) -> type: + def __class_getitem__(cls, key: object) -> type: """Wrap all objects of this class to become generic aliases.""" - return typing._GenericAlias(cls, key) # type: ignore[attr-defined,no-any-return] + return _GenericAlias(cls, key) # type: ignore[no-any-return] def __new__(cls, handle: io.TextIOWrapper) -> "BedReader[BedKind]": """Bind the kind of BED type to this class for later introspection.""" @@ -413,6 +508,7 @@ def __enter__(self) -> "BedReader[BedKind]": def __iter__(self) -> Iterator[BedKind]: """Iterate through the BED records of this IO handle.""" + # TODO: Implement __next__ and type this class as an iterator. if self.bed_kind is None: raise NotImplementedError("Untyped reading is not yet supported!") for line in self._handle: @@ -432,12 +528,12 @@ def __exit__( self.close() return super().__exit__(__exc_type, __exc_value, __traceback) - @classmethod - def from_path(cls, path: Path | str, bed_kind: type[BedKind]) -> "BedReader[BedKind]": + @classmethod_generic + def from_path(cls, path: Path | str) -> "BedReader[BedKind]": """Open a BED reader from a file path.""" - reader = cls(handle=Path(path).open()) - reader.bed_kind = bed_kind - return reader + reader = cls(handle=Path(path).open()) # type: ignore[operator] + reader.bed_kind = None if len(cls.__args__) == 0 else cls.__args__[0] # type: ignore[attr-defined] + return cast("BedReader[BedKind]", reader) def close(self) -> None: """Close the underlying IO handle.""" diff --git a/tests/test_bedspec.py b/tests/test_bedspec.py index 3ae43b2..1564174 100644 --- a/tests/test_bedspec.py +++ b/tests/test_bedspec.py @@ -5,6 +5,7 @@ import pytest +from bedspec import MISSING_FIELD from bedspec import Bed2 from bedspec import Bed3 from bedspec import Bed4 @@ -16,20 +17,33 @@ from bedspec import BedStrand from bedspec import BedType from bedspec import BedWriter +from bedspec import Locatable from bedspec import PairBed from bedspec import PointBed from bedspec import SimpleBed from bedspec import Stranded +from bedspec._bedspec import is_union + + +def test_is_union() -> None: + """Test that a union type is a union type.""" + # TODO: have a positive unit test for is_union + assert not is_union(type(int)) + assert not is_union(type(None)) def test_bed_strand() -> None: """Test that BED strands behave as string.""" assert BedStrand("+") == BedStrand.POSITIVE assert BedStrand("-") == BedStrand.NEGATIVE - assert BedStrand(".") == BedStrand.UNKNOWN assert str(BedStrand.POSITIVE) == "+" assert str(BedStrand.NEGATIVE) == "-" - assert str(BedStrand.UNKNOWN) == "." + + +def test_bed_strand_opposite() -> None: + """Test that we return an opposite BED strand.""" + assert BedStrand.POSITIVE.opposite() == BedStrand.NEGATIVE + assert BedStrand.NEGATIVE.opposite() == BedStrand.POSITIVE def test_bed_color() -> None: @@ -49,6 +63,13 @@ def test_all_bed_types_are_dataclasses(bed_type: type[BedType]) -> None: assert dataclasses.is_dataclass(bed_type) +def test_locatable_structural_type() -> None: + """Test that the Locatable structural type is set correctly.""" + _: Locatable = Bed6( + contig="chr1", start=1, end=2, name="foo", score=3, strand=BedStrand.POSITIVE + ) + + def test_stranded_structural_type() -> None: """Test that the Stranded structural type is set correctly.""" _: Stranded = Bed6( @@ -108,6 +129,11 @@ def test_point_bed_types_have_a_territory() -> None: assert list(Bed2(contig="chr1", start=1).territory()) == [expected] +def test_point_bed_types_are_length_1() -> None: + """Test that a point BED has a length of 1.""" + assert Bed2(contig="chr1", start=1).length == 1 + + def test_simple_bed_types_have_a_territory() -> None: """Test that simple BEDs are their own territory.""" for record in ( @@ -119,6 +145,13 @@ def test_simple_bed_types_have_a_territory() -> None: assert list(record.territory()) == [record] +def test_simple_bed_types_have_length() -> None: + """Test that a simple BED has the right length.""" + assert Bed3(contig="chr1", start=1, end=2).length == 1 + assert Bed3(contig="chr1", start=1, end=3).length == 2 + assert Bed3(contig="chr1", start=1, end=4).length == 3 + + def test_simple_bed_validates_start_and_end() -> None: """Test that a simple BED record validates its start and end.""" with pytest.raises(ValueError): @@ -333,6 +366,32 @@ def test_bed_writer_can_write_all_bed_types(bed: BedType, expected: str, tmp_pat assert Path(tmp_path / "test.bed").read_text() == expected +def test_bed_writer_can_be_closed(tmp_path: Path) -> None: + """Test that we can close a BED writer.""" + path: Path = tmp_path / "test.bed" + writer = BedWriter[Bed3](open(path, "w")) + writer.write(Bed3(contig="chr1", start=1, end=2)) + writer.close() + + with pytest.raises(ValueError, match="I/O operation on closed file"): + writer.write(Bed3(contig="chr1", start=1, end=2)) + + +def test_bed_wrtier_can_write_bed_records_from_a_path(tmp_path: Path) -> None: + """Test that the BED write can write BED records from a path if it is typed.""" + + bed: Bed3 = Bed3(contig="chr1", start=1, end=2) + + with BedWriter[Bed3].from_path(tmp_path / "test1.bed") as writer: + writer.write(bed) + + assert (tmp_path / "test1.bed").read_text() == "chr1\t1\t2\n" + + with BedWriter[Bed3].from_path(str(tmp_path / "test2.bed")) as writer: + writer.write(bed) + + assert (tmp_path / "test2.bed").read_text() == "chr1\t1\t2\n" + def test_bed_writer_can_write_all_at_once(tmp_path: Path) -> None: """Test that the BED writer can write multiple BED records at once.""" @@ -414,6 +473,14 @@ def test_bed_writer_write_comment_without_prefix_pound_symbol(tmp_path: Path) -> assert Path(tmp_path / "test.bed").read_text() == expected +def test_bed_writer_can_be_used_as_context_manager(tmp_path: Path) -> None: + """Test that the BED writer can be used as a context manager.""" + with BedWriter[Bed2](open(tmp_path / "test.bed", "w")) as handle: + handle.write(Bed2(contig="chr1", start=1)) + handle.write(Bed2(contig="chr2", start=2)) + + expected = "chr1\t1\nchr2\t2\n" + assert Path(tmp_path / "test.bed").read_text() == expected def test_bed_reader_can_read_bed_records_if_typed(tmp_path: Path) -> None: """Test that the BED reader can read BED records if the reader is typed.""" @@ -430,6 +497,17 @@ def test_bed_reader_can_read_bed_records_if_typed(tmp_path: Path) -> None: assert list(BedReader[Bed3](handle)) == [bed] +def test_bed_reader_can_be_closed(tmp_path: Path) -> None: + """Test that we can close a BED reader.""" + path: Path = tmp_path / "test.bed" + path.touch() + reader = BedReader[Bed3](open(path)) + reader.close() + + with pytest.raises(ValueError, match="I/O operation on closed file"): + next(iter(reader)) + + def test_bed_reader_can_read_bed_records_from_a_path(tmp_path: Path) -> None: """Test that the BED reader can read BED records from a path if it is typed.""" @@ -441,10 +519,10 @@ def test_bed_reader_can_read_bed_records_from_a_path(tmp_path: Path) -> None: assert Path(tmp_path / "test.bed").read_text() == "chr1\t1\t2\n" - reader = BedReader[Bed3].from_path(tmp_path / "test.bed", bed_kind=Bed3) + reader = BedReader[Bed3].from_path(tmp_path / "test.bed") assert list(reader) == [bed] - reader = BedReader[Bed3].from_path(str(tmp_path / "test.bed"), bed_kind=Bed3) + reader = BedReader[Bed3].from_path(str(tmp_path / "test.bed")) assert list(reader) == [bed] @@ -484,3 +562,35 @@ def test_bed_reader_can_read_bed_records_with_comments(tmp_path: Path) -> None: with open(tmp_path / "test.bed", "r") as handle: assert list(BedReader[Bed3](handle)) == [bed] + + +def test_bed_reader_can_read_optional_string_types(tmp_path: Path) -> None: + """Test that the BED reader can read BED records with optional string types.""" + + bed: Bed4 = Bed4(contig="chr1", start=1, end=2, name=None) + + (tmp_path / "test.bed").write_text(f"chr1\t1\t2\t{MISSING_FIELD}\n") + + with open(tmp_path / "test.bed", "r") as handle: + assert list(BedReader[Bed4](handle)) == [bed] + + +def test_bed_reader_can_read_optional_other_types(tmp_path: Path) -> None: + """Test that the BED reader can read BED records with optional other types.""" + + bed: Bed5 = Bed5(contig="chr1", start=1, end=2, name="foo", score=None) + + (tmp_path / "test.bed").write_text(f"chr1\t1\t2\tfoo\t{MISSING_FIELD}\n") + + with open(tmp_path / "test.bed", "r") as handle: + assert list(BedReader[Bed5](handle)) == [bed] + + +def test_bed_reader_can_be_used_as_context_manager(tmp_path: Path) -> None: + """Test that the BED reader can be used as a context manager.""" + bed: Bed4 = Bed4(contig="chr1", start=1, end=2, name=None) + + (tmp_path / "test.bed").write_text(f"chr1\t1\t2\t{MISSING_FIELD}\n") + + with BedReader[Bed4](open(tmp_path / "test.bed")) as reader: + assert list(reader) == [bed]