Skip to content

Commit

Permalink
chore: small fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval committed Jun 30, 2024
1 parent fa7ddfa commit 9b3dc6b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
4 changes: 2 additions & 2 deletions bedspec/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __exit__(
def from_path(cls, path: Path | str) -> "BedWriter[_BedKind]":
"""Open a BED writer from a file path."""
handle: TextIO | TextIOWrapper | BGZipWriter
if any(str(path).endswith(extension) for extension in _ALL_GZIP_COMPATIBLE_EXTENSIONS):
if any(Path(path).suffix == extension for extension in _ALL_GZIP_COMPATIBLE_EXTENSIONS):
handle = BGZipWriter(open(path, "wb"))
else:
handle = Path(path).open("w")
Expand Down Expand Up @@ -230,7 +230,7 @@ def __exit__(
def from_path(cls, path: Path | str) -> "BedReader[_BedKind]":
"""Open a BED reader from a plaintext or gzip compressed file path."""
handle: io.TextIOWrapper | TextIO
if any(str(path).endswith(extension) for extension in _ALL_GZIP_COMPATIBLE_EXTENSIONS):
if any(Path(path).suffix == extension for extension in _ALL_GZIP_COMPATIBLE_EXTENSIONS):
handle = cast(io.TextIOWrapper, gzip.open(path, "rt"))
else:
handle = Path(path).open("r")
Expand Down
40 changes: 40 additions & 0 deletions test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from io import IOBase
from multiprocessing import cpu_count
from typing import IO
from typing import ClassVar
from typing import ContextManager

import xbgzip.bgzip_utils as bgzip
from xbgzip.bgzip import Deflater


class BgzipWriter(IOBase, ContextManager):
encoding: ClassVar[str] = "utf-8"

def __init__(self, handle: IO[str], threads: int = cpu_count()):
self._handle: IO[str] = handle
self._buffer: bytearray = bytearray()
self._deflater: Deflater = Deflater(threads)

def writable(self) -> True:
return True

def _compress(self, process_all_chunks: bool = False) -> None:
while self._buffer:
bytes_deflated, blocks = self._deflater.deflate(self._buffer)
for block in blocks:
self._handle.write(block)
self._buffer = self._buffer[bytes_deflated:]
if not process_all_chunks and len(self._buffer) < bgzip.block_data_inflated_size:
break

def write(self, string: str) -> None:
self._buffer.extend(string.encode(encoding=self.encoding))
if len(self._buffer) > bgzip.block_batch_size * bgzip.block_data_inflated_size:
self._compress()

def close(self):
eof = bytes.fromhex("1f8b08040000000000ff0600424302001b0003000000000000000000")
if self._buffer:
self._compress(process_all_chunks=True)
self._handle.write(eof.decode("utf-8"))

0 comments on commit 9b3dc6b

Please sign in to comment.