Skip to content

Commit

Permalink
Allow the OverlapDetector to be generic over input feature types (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval authored Aug 2, 2024
1 parent 592fb8e commit b449126
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 56 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,6 @@ dmypy.json
.pyre/

# VSCode configurations
.vscode/
.vscode/

.DS_Store
1 change: 1 addition & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[submodule "cgranges"]
path = cgranges
url = https://github.com/lh3/cgranges
ignore = dirty
14 changes: 13 additions & 1 deletion pybedlite/bed_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
if TYPE_CHECKING:
from pybedlite.overlap_detector import Interval


"""Maximum BED fields that can be present in a well formed BED file written to specification"""
MAX_BED_FIELDS: int = 12

Expand Down Expand Up @@ -183,6 +182,19 @@ def bed_fields(self) -> List[str]:
),
]

@property
def refname(self) -> str:
"""The reference name of the interval described by the record."""
return self.chrom

@property
def negative(self) -> bool:
"""
True if the interval is negatively stranded, False if the interval is unstranded or
positively stranded.
"""
return self.strand is BedStrand.Negative

def as_bed_line(self, number_of_output_fields: Optional[int] = None) -> str:
"""
Converts a BED record to a tab delimited BED line equivalent, including up to the number of
Expand Down
206 changes: 157 additions & 49 deletions pybedlite/overlap_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@
Utility Classes for Querying Overlaps with Genomic Regions
----------------------------------------------------------
The :class:`~pybedlite.overlap_detector.OverlapDetector` class detects and returns overlaps between
a set of regions and another region on a reference. The overlap detector may contain a collection
of interval-like Python objects that have the following properties:
* `refname` (str): The reference sequence name
* `start` (int): A 0-based start position
* `end` (int): A 0-based half-open end position
This contract is described by the :class:`~pybedlite.overlap_detector.Span` protocol.
Interval-like Python objects may also contain strandedness information which will be used
for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using
the following property if it is present:
* `negative (bool)`: True if the interval is negatively stranded, False if the interval is
unstranded or positively stranded
This contract is described by the :class:`~pybedlite.overlap_detector.StrandedSpan` protocol.
Examples of Detecting Overlaps
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -32,21 +51,26 @@
The module contains the following public classes:
- :class:`~pybedlite.overlap_detector.Interval` -- Represents a region mapping to the genome
that is 0-based and open-ended
- :class:`~pybedlite.overlap_detector.Interval` -- Represents a region mapping to a reference
sequence that is 0-based and open-ended
- :class:`~pybedlite.overlap_detector.OverlapDetector` -- Detects and returns overlaps between
a set of genomic regions and another genomic region
a set of regions and another region on a reference
"""

import itertools
from pathlib import Path
from typing import Dict
from typing import Generic
from typing import Hashable
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Protocol
from typing import Set
from typing import Type
from typing import TypeVar
from typing import Union

import attr

Expand All @@ -56,15 +80,48 @@
from pybedlite.bed_source import BedSource


class Span(Hashable, Protocol):
"""
A structural type for a span on a reference sequence with zero-based open-ended coordinates.
"""

@property
def refname(self) -> str:
"""A reference sequence name."""

@property
def start(self) -> int:
"""A 0-based start position."""

@property
def end(self) -> int:
"""A 0-based open-ended end position."""


class StrandedSpan(Span, Protocol):
"""
A structural type for a stranded span on a reference sequence with zero-based open-ended
coordinates.
"""

@property
def negative(self) -> bool:
"""
True if the interval is negatively stranded, False if the interval is unstranded or
positively stranded.
"""


@attr.s(frozen=True, auto_attribs=True)
class Interval:
"""A region mapping to the genome that is 0-based and open-ended
"""A region mapping to a reference sequence that is 0-based and open-ended
Attributes:
refname (str): the refname (or chromosome)
start (int): the 0-based start position
end (int): the 0-based end position (exclusive)
negative (bool): true if the interval is on the negative strand, false otherwise
end (int): the 0-based half-open end position
negative (bool): true if the interval is negatively stranded, False if the interval is
unstranded or positively stranded
name (Optional[str]): an optional name assigned to the interval
"""

Expand Down Expand Up @@ -135,7 +192,8 @@ def from_ucsc(
Construct an `Interval` from a UCSC "position"-formatted string.
The "Position" format (referring to the "1-start, fully-closed" system as coordinates are
"positioned" in the browser)
"positioned" in the browser):
* Written as: chr1:127140001-127140001
* The location may optionally be followed by a parenthetically enclosed strand, e.g.
chr1:127140001-127140001(+).
Expand All @@ -144,6 +202,7 @@ def from_ucsc(
end coordinates.
* When in this format, the assumption is that the coordinate is **1-start,
fully-closed.**
https://genome-blog.gi.ucsc.edu/blog/2016/12/12/the-ucsc-genome-browser-coordinate-counting-systems/ # noqa: E501
Note that when the string does not have a specified strand, the `Interval`'s negative
Expand Down Expand Up @@ -178,55 +237,73 @@ def from_ucsc(
) from exception


class OverlapDetector(Iterable[Interval]):
"""Detects and returns overlaps between a set of genomic regions and another genomic region.
SpanType = TypeVar("SpanType", bound=Union[Span, StrandedSpan])
"""
A generic reference sequence span. This type variable is used for describing the generic type
contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`.
"""


class OverlapDetector(Generic[SpanType], Iterable[SpanType]):
"""Detects and returns overlaps between a set of regions and another region on a reference.
The overlap detector may contain a collection of interval-like Python objects that have the
following properties:
* `refname` (str): The reference sequence name
* `start` (int): A 0-based start position
* `end` (int): A 0-based half-open end position
Since :class:`~pybedlite.overlap_detector.Interval` objects are used both to populate the
overlap detector and to query it, the coordinate system in use is also 0-based open-ended.
Interval-like Python objects may also contain strandedness information which will be used
for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using
the following property if it is present:
* `negative (bool)`: True if the interval is negatively stranded, False if the interval is
unstranded or positively stranded
The same interval may be added multiple times, but only a single instance will be returned
when querying for overlaps. Intervals with the same coordinates but different names are
treated as different intervals.
when querying for overlaps.
This detector is the most efficient when all intervals are added ahead of time.
"""

def __init__(self) -> None:
def __init__(self, intervals: Optional[Iterable[SpanType]] = None) -> None:
# A mapping from the contig/chromosome name to the associated interval tree
self._refname_to_tree: Dict[str, cr.cgranges] = {} # type: ignore
self._refname_to_indexed: Dict[str, bool] = {}
self._refname_to_intervals: Dict[str, List[Interval]] = {}
self._refname_to_intervals: Dict[str, List[SpanType]] = {}
if intervals is not None:
self.add_all(intervals)

def __iter__(self) -> Iterator[Interval]:
def __iter__(self) -> Iterator[SpanType]:
"""Iterates over the intervals in the overlap detector."""
return itertools.chain(*self._refname_to_intervals.values())

def add(self, interval: Interval) -> None:
def add(self, interval: SpanType) -> None:
"""Adds an interval to this detector.
Args:
interval: the interval to add to this detector
"""
refname = interval.refname
if refname not in self._refname_to_tree:
self._refname_to_tree[refname] = cr.cgranges() # type: ignore
self._refname_to_indexed[refname] = False
self._refname_to_intervals[refname] = []
if interval.refname not in self._refname_to_tree:
self._refname_to_tree[interval.refname] = cr.cgranges() # type: ignore
self._refname_to_indexed[interval.refname] = False
self._refname_to_intervals[interval.refname] = []

# Append the interval to the list of intervals for this tree, keeping the index
# of where it was inserted
interval_idx: int = len(self._refname_to_intervals[refname])
self._refname_to_intervals[refname].append(interval)
interval_idx: int = len(self._refname_to_intervals[interval.refname])
self._refname_to_intervals[interval.refname].append(interval)

# Add the interval to the tree
tree = self._refname_to_tree[refname]
tree = self._refname_to_tree[interval.refname]
tree.add(interval.refname, interval.start, interval.end, interval_idx)

# Flag this tree as needing to be indexed after adding a new interval, but defer
# indexing
self._refname_to_indexed[refname] = False
self._refname_to_indexed[interval.refname] = False

def add_all(self, intervals: Iterable[Interval]) -> None:
def add_all(self, intervals: Iterable[SpanType]) -> None:
"""Adds one or more intervals to this detector.
Args:
Expand All @@ -235,7 +312,7 @@ def add_all(self, intervals: Iterable[Interval]) -> None:
for interval in intervals:
self.add(interval)

def overlaps_any(self, interval: Interval) -> bool:
def overlaps_any(self, interval: Span) -> bool:
"""Determines whether the given interval overlaps any interval in this detector.
Args:
Expand All @@ -258,70 +335,101 @@ def overlaps_any(self, interval: Interval) -> bool:
else:
return True

def get_overlaps(self, interval: Interval) -> List[Interval]:
def get_overlaps(self, interval: Span) -> List[SpanType]:
"""Returns any intervals in this detector that overlap the given interval.
Args:
interval: the interval to check
Returns:
The list of intervals in this detector that overlap the given interval, or the empty
list if no overlaps exist. The intervals will be return in ascending genomic order.
list if no overlaps exist. The intervals will be returned sorted using the following
sort keys:
* The interval's start (ascending)
* The interval's end (ascending)
* The interval's strand, positive or negative (assumed to be positive if undefined)
* The interval's reference sequence name (lexicographically)
"""
tree = self._refname_to_tree.get(interval.refname)
if tree is None:
return []
else:
if not self._refname_to_indexed[interval.refname]:
tree.index()
ref_intervals: List[Interval] = self._refname_to_intervals[interval.refname]
ref_intervals: List[SpanType] = self._refname_to_intervals[interval.refname]
# NB: only return unique instances of intervals
intervals: Set[Interval] = {
intervals: Set[SpanType] = {
ref_intervals[index]
for _, _, index in tree.overlap(interval.refname, interval.start, interval.end)
}
return sorted(
intervals, key=lambda intv: (intv.start, intv.end, intv.negative, intv.name)
intervals,
key=lambda intv: (
intv.start,
intv.end,
self._negative(intv),
intv.refname,
),
)

def get_enclosing_intervals(self, interval: Interval) -> List[Interval]:
@staticmethod
def _negative(interval: Span) -> bool:
"""
True if the interval is negatively stranded, False if the interval is unstranded or
positively stranded.
"""
return getattr(interval, "negative", False)

def get_enclosing_intervals(self, interval: Span) -> List[SpanType]:
"""Returns the set of intervals in this detector that wholly enclose the query interval.
i.e. query.start >= target.start and query.end <= target.end.
i.e. `query.start >= target.start` and `query.end <= target.end`.
Args:
interval: the query interval
Args:
interval: the query interval
Returns:
The list of intervals in this detector that enclose the query interval.
The intervals will be returned in ascending genomic order.
Returns:
The list of intervals in this detector that enclose the query interval. The intervals
will be returned sorted using the following sort keys:
* The interval's start (ascending)
* The interval's end (ascending)
* The interval's strand, positive or negative (assumed to be positive if undefined)
* The interval's reference sequence name (lexicographically)
"""
results = self.get_overlaps(interval)
return [i for i in results if interval.start >= i.start and interval.end <= i.end]

def get_enclosed(self, interval: Interval) -> List[Interval]:
def get_enclosed(self, interval: Span) -> List[SpanType]:
"""Returns the set of intervals in this detector that are enclosed by the query
interval. I.e. target.start >= query.start and target.end <= query.end.
Args:
interval: the query interval
Args:
interval: the query interval
Returns:
The list of intervals in this detector that are enclosed within the query interval.
The intervals will be returned sorted using the following sort keys:
Returns:
The list of intervals in this detector that are enclosed within the query interval.
The intervals will be return in ascending genomic order.
* The interval's start (ascending)
* The interval's end (ascending)
* The interval's strand, positive or negative (assumed to be positive if undefined)
* The interval's reference sequence name (lexicographically)
"""
results = self.get_overlaps(interval)
return [i for i in results if i.start >= interval.start and i.end <= interval.end]

@classmethod
def from_bed(cls, path: Path) -> "OverlapDetector":
def from_bed(cls, path: Path) -> "OverlapDetector[BedRecord]":
"""Builds a :class:`~pybedlite.overlap_detector.OverlapDetector` from a BED file.
Args:
path: the path to the BED file
Returns:
An overlap detector for the regions in the BED file.
"""
detector = OverlapDetector()
detector: OverlapDetector[BedRecord] = OverlapDetector()

for region in BedSource(path):
detector.add(Interval.from_bedrecord(region))
detector.add(region)

return detector
Loading

0 comments on commit b449126

Please sign in to comment.