Skip to content

Commit

Permalink
Merge pull request #76 from samuelcolvin/compression-optional
Browse files Browse the repository at this point in the history
make clickhouse-cityhash dependency optional
  • Loading branch information
long2ice authored Aug 7, 2023
2 parents 0b602b2 + dbf4888 commit cf50aff
Show file tree
Hide file tree
Showing 6 changed files with 571 additions and 576 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ up:
@poetry update

deps:
@poetry install --no-root
@poetry install --extras compression --no-root

style: deps
@isort -src $(checkfiles)
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
> pip install asynch
```

or if you want to install [`clickhouse-cityhash`](https://pypi.org/project/clickhouse-cityhash/) to enable
transport compression

```shell
> pip install asynch[compression]
```

## Usage

Connect to ClickHouse
Expand Down
13 changes: 11 additions & 2 deletions asynch/proto/compression/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import importlib
from typing import TYPE_CHECKING, Type

from clickhouse_cityhash.cityhash import CityHash128

from asynch.errors import ChecksumDoesntMatchError, UnknownCompressionMethod
from asynch.proto.protocol import CompressionMethodByte

Expand Down Expand Up @@ -74,6 +72,8 @@ def decompress_data(self, data, uncompressed_size):
raise NotImplementedError

async def get_decompressed_data(self, method_byte, compressed_hash, extra_header_size):
CityHash128 = import_cityhash()

size_with_header = await self.reader.read_uint32()
compressed_size = size_with_header - extra_header_size - 4

Expand All @@ -90,3 +90,12 @@ async def get_decompressed_data(self, method_byte, compressed_hash, extra_header
uncompressed_size = await reader.read_uint32()
compressed = compressed[4:compressed_size]
return self.decompress_data(compressed, uncompressed_size)


def import_cityhash():
try:
from clickhouse_cityhash.cityhash import CityHash128
except ImportError as e:
raise ImportError("Please install clickhouse-cityhash to enable compression") from e
else:
return CityHash128
5 changes: 2 additions & 3 deletions asynch/proto/streams/compressed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from clickhouse_cityhash.cityhash import CityHash128

from asynch.proto import constants
from asynch.proto.compression import BaseCompressor
from asynch.proto.compression import BaseCompressor, import_cityhash
from asynch.proto.context import Context
from asynch.proto.streams.block import BlockReader, BlockWriter
from asynch.proto.streams.buffered import (
Expand All @@ -28,6 +26,7 @@ def __init__(
super().__init__(reader, self.writer, context)

async def finalize(self):
CityHash128 = import_cityhash()
await self.writer.flush()

compressed = await self.get_compressed()
Expand Down
Loading

0 comments on commit cf50aff

Please sign in to comment.