From c5e868a2e6da4e5162c9b8e7c41260abf93d5d44 Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Sun, 17 Mar 2024 15:22:07 +0300 Subject: [PATCH 1/6] rebase --- asynch/proto/streams/buffered.py | 43 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/asynch/proto/streams/buffered.py b/asynch/proto/streams/buffered.py index f7d3dd0..6c17398 100644 --- a/asynch/proto/streams/buffered.py +++ b/asynch/proto/streams/buffered.py @@ -111,12 +111,23 @@ def __init__(self, reader: StreamReader, buffer_max_size: int = constants.BUFFER self.current_buffer_size = 0 self.position = 0 + def _reset_buffer(self): + self.position = 0 + self.buffer = bytearray() + async def _read_into_buffer(self): packet = await self.reader.read(self.buffer_max_size) self.buffer.extend(packet) self.current_buffer_size = len(self.buffer) + async def _update_buffer_if_full(self): + if self.position == self.current_buffer_size: + self._reset_buffer() + await self._read_into_buffer() + def _read_one(self): + if not self.buffer and not self.position: + return b"" packet = self.buffer[self.position] self.position += 1 return packet @@ -124,18 +135,23 @@ def _read_one(self): async def read_varint(self): packets = bytearray() while True: - if self.position == self.current_buffer_size: - self._reset_buffer() - await self._read_into_buffer() + await self._update_buffer_if_full() packet = self._read_one() packets.append(packet) if packet < 0x80: break return leb128.u.decode(packets) - def _reset_buffer(self): - self.position = 0 - self.buffer = bytearray() + async def read_bytes(self, length: int): + packets = bytearray() + while length > 0: + await self._update_buffer_if_full() + read_position = self.position + length + packet = self.buffer[self.position : read_position] # noqa: E203 + length -= len(packet) + self.position += len(packet) + packets.extend(packet) + return packets async def read_str(self, as_bytes: bool = False): length = await self.read_varint() @@ -150,21 +166,6 @@ async def read_fixed_str(self, length: int, as_bytes: bool = False): return packet return packet.decode() - async def read_bytes(self, length: int): - packets = bytearray() - while length > 0: - if self.position == self.current_buffer_size: - self._reset_buffer() - await self._read_into_buffer() - - read_position = self.position + length - packet = self.buffer[self.position : read_position] # noqa: E203 - length -= len(packet) - self.position += len(packet) - packets.extend(packet) - - return packets - async def read_int(self, fmt: str): s = struct.Struct("<" + fmt) packet = await self.read_bytes(s.size) From 7ea33c3ff08e66f6fe7e5fcb57fca6f6dcb2ed9b Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Mon, 18 Mar 2024 09:06:24 +0300 Subject: [PATCH 2/6] refactor and reformat some methods of BufferedReader class --- asynch/proto/streams/buffered.py | 49 +++++++++----------------------- 1 file changed, 14 insertions(+), 35 deletions(-) diff --git a/asynch/proto/streams/buffered.py b/asynch/proto/streams/buffered.py index 6c17398..f61994b 100644 --- a/asynch/proto/streams/buffered.py +++ b/asynch/proto/streams/buffered.py @@ -126,7 +126,7 @@ async def _update_buffer_if_full(self): await self._read_into_buffer() def _read_one(self): - if not self.buffer and not self.position: + if not (self.buffer or self.position): return b"" packet = self.buffer[self.position] self.position += 1 @@ -153,67 +153,46 @@ async def read_bytes(self, length: int): packets.extend(packet) return packets - async def read_str(self, as_bytes: bool = False): - length = await self.read_varint() - packet = await self.read_bytes(length) - if as_bytes: - return packet - return packet.decode() - async def read_fixed_str(self, length: int, as_bytes: bool = False): packet = await self.read_bytes(length) if as_bytes: return packet return packet.decode() + async def read_str(self, as_bytes: bool = False): + length = await self.read_varint() + return await self.read_fixed_str(length=length, as_bytes=as_bytes) + async def read_int(self, fmt: str): s = struct.Struct("<" + fmt) packet = await self.read_bytes(s.size) return s.unpack(packet)[0] - async def read_int8( - self, - ): + async def read_int8(self): return await self.read_int("b") - async def read_int16( - self, - ): + async def read_int16(self): return await self.read_int("h") - async def read_int32( - self, - ): + async def read_int32(self): return await self.read_int("i") - async def read_int64( - self, - ): + async def read_int64(self): return await self.read_int("q") - async def read_uint8( - self, - ): + async def read_uint8(self): return await self.read_int("B") - async def read_uint16( - self, - ): + async def read_uint16(self): return await self.read_int("H") - async def read_uint32( - self, - ): + async def read_uint32(self): return await self.read_int("I") - async def read_uint64( - self, - ): + async def read_uint64(self): return await self.read_int("Q") - async def read_uint128( - self, - ): + async def read_uint128(self): hi = await self.read_int("Q") lo = await self.read_int("Q") return (hi << 64) + lo From 2647a36a6e5bd498b497646f4736850378de09cb Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Mon, 18 Mar 2024 10:27:28 +0300 Subject: [PATCH 3/6] organise tests on buffered classes within one dir at test_proto/streams --- .../streams/buffered/test_buffered_readers.py | 39 +++++++++++++++++++ .../buffered/test_buffered_writers.py} | 16 +------- 2 files changed, 40 insertions(+), 15 deletions(-) create mode 100644 tests/test_proto/streams/buffered/test_buffered_readers.py rename tests/test_proto/{test_io.py => streams/buffered/test_buffered_writers.py} (64%) diff --git a/tests/test_proto/streams/buffered/test_buffered_readers.py b/tests/test_proto/streams/buffered/test_buffered_readers.py new file mode 100644 index 0000000..19cc873 --- /dev/null +++ b/tests/test_proto/streams/buffered/test_buffered_readers.py @@ -0,0 +1,39 @@ +from asyncio import StreamReader + +import pytest + +from asynch.proto.streams.buffered import BufferedReader + + +@pytest.mark.parametrize( + ("stream_data", "answer"), + [(b"9", 57), (b"32", 51)], +) +async def test_read_varint(stream_data: bytes, answer: bytes): + """When `(b"", None)`, the reading will get stuck.""" + + stream_reader = StreamReader() + stream_reader.feed_data(stream_data) + reader = BufferedReader(stream_reader) + + print() + result = await reader.read_varint() + print(f"RES = {result}") + + assert answer == result + + +@pytest.mark.parametrize( + ("stream_data", "bytes_to_read", "answer"), + [(b"", 0, b""), (b"02", 1, b"0"), (b"3456", 4, b"3456")], +) +async def test_read_bytes(stream_data: bytes, bytes_to_read: int, answer: bytes): + """If `bytes_to_read > len(stream_data)`, the reading will get stuck.""" + + stream_reader = StreamReader() + stream_reader.feed_data(stream_data) + reader = BufferedReader(stream_reader, 1) + + result = await reader.read_bytes(bytes_to_read) + + assert answer == result diff --git a/tests/test_proto/test_io.py b/tests/test_proto/streams/buffered/test_buffered_writers.py similarity index 64% rename from tests/test_proto/test_io.py rename to tests/test_proto/streams/buffered/test_buffered_writers.py index e93c105..8c5b2a4 100644 --- a/tests/test_proto/test_io.py +++ b/tests/test_proto/streams/buffered/test_buffered_writers.py @@ -1,22 +1,8 @@ -from asyncio import StreamReader from unittest.mock import AsyncMock import pytest -from asynch.proto.streams.buffered import BufferedReader, BufferedWriter - - -@pytest.mark.asyncio -async def test_BufferedReader_overflow(): - stream_data = b"1234" - - stream_reader = StreamReader() - stream_reader.feed_data(stream_data) - reader = BufferedReader(stream_reader, 1) - - result = await reader.read_bytes(4) - - assert result == stream_data +from asynch.proto.streams.buffered import BufferedWriter @pytest.mark.asyncio From efcf016fd4ecc2a5fe5e18c6361b56c8032ac701 Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Fri, 22 Mar 2024 09:51:01 +0300 Subject: [PATCH 4/6] sketch the pile of necessary tests for Buffered utilities --- .../streams/buffered/test_buffered_readers.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/test_proto/streams/buffered/test_buffered_readers.py b/tests/test_proto/streams/buffered/test_buffered_readers.py index 19cc873..dc11a97 100644 --- a/tests/test_proto/streams/buffered/test_buffered_readers.py +++ b/tests/test_proto/streams/buffered/test_buffered_readers.py @@ -7,28 +7,33 @@ @pytest.mark.parametrize( ("stream_data", "answer"), - [(b"9", 57), (b"32", 51)], + [ + (b"9", 57), + (b"32", 51), + ], ) async def test_read_varint(stream_data: bytes, answer: bytes): - """When `(b"", None)`, the reading will get stuck.""" + """When `(b"", 0)`, the reading gets stuck.""" stream_reader = StreamReader() stream_reader.feed_data(stream_data) reader = BufferedReader(stream_reader) - print() result = await reader.read_varint() - print(f"RES = {result}") assert answer == result @pytest.mark.parametrize( ("stream_data", "bytes_to_read", "answer"), - [(b"", 0, b""), (b"02", 1, b"0"), (b"3456", 4, b"3456")], + [ + (b"", 0, b""), + (b"02", 1, b"0"), + (b"3456", 4, b"3456"), + ], ) async def test_read_bytes(stream_data: bytes, bytes_to_read: int, answer: bytes): - """If `bytes_to_read > len(stream_data)`, the reading will get stuck.""" + """If `bytes_to_read > len(stream_data)`, the reading gets stuck.""" stream_reader = StreamReader() stream_reader.feed_data(stream_data) From 74bb183976859c368e25a01185cf734bad7088c5 Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Fri, 22 Mar 2024 09:52:23 +0300 Subject: [PATCH 5/6] guard the reading from the empty buffer --- asynch/proto/streams/buffered.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/asynch/proto/streams/buffered.py b/asynch/proto/streams/buffered.py index f61994b..b65ab87 100644 --- a/asynch/proto/streams/buffered.py +++ b/asynch/proto/streams/buffered.py @@ -111,6 +111,20 @@ def __init__(self, reader: StreamReader, buffer_max_size: int = constants.BUFFER self.current_buffer_size = 0 self.position = 0 + async def _refill_buffer(self) -> bool: + if self.position == self.current_buffer_size: + self._reset_buffer() + await self._read_into_buffer() + + def _is_buffer_empty(self): + return not (self.buffer or self.position) + + async def _is_buffer_readable(self): + await self._refill_buffer() + if self._is_buffer_empty(): + return False + return True + def _reset_buffer(self): self.position = 0 self.buffer = bytearray() @@ -120,14 +134,7 @@ async def _read_into_buffer(self): self.buffer.extend(packet) self.current_buffer_size = len(self.buffer) - async def _update_buffer_if_full(self): - if self.position == self.current_buffer_size: - self._reset_buffer() - await self._read_into_buffer() - def _read_one(self): - if not (self.buffer or self.position): - return b"" packet = self.buffer[self.position] self.position += 1 return packet @@ -135,7 +142,8 @@ def _read_one(self): async def read_varint(self): packets = bytearray() while True: - await self._update_buffer_if_full() + if not (await self._is_buffer_readable()): + break packet = self._read_one() packets.append(packet) if packet < 0x80: @@ -145,7 +153,8 @@ async def read_varint(self): async def read_bytes(self, length: int): packets = bytearray() while length > 0: - await self._update_buffer_if_full() + if not (await self._is_buffer_readable()): + break read_position = self.position + length packet = self.buffer[self.position : read_position] # noqa: E203 length -= len(packet) From 8ee24b5612d3121229ca96b261092cc4709d5cb9 Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Fri, 22 Mar 2024 10:54:01 +0300 Subject: [PATCH 6/6] fix type hints --- asynch/proto/streams/buffered.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asynch/proto/streams/buffered.py b/asynch/proto/streams/buffered.py index b65ab87..f146140 100644 --- a/asynch/proto/streams/buffered.py +++ b/asynch/proto/streams/buffered.py @@ -111,7 +111,7 @@ def __init__(self, reader: StreamReader, buffer_max_size: int = constants.BUFFER self.current_buffer_size = 0 self.position = 0 - async def _refill_buffer(self) -> bool: + async def _refill_buffer(self): if self.position == self.current_buffer_size: self._reset_buffer() await self._read_into_buffer() @@ -119,7 +119,7 @@ async def _refill_buffer(self) -> bool: def _is_buffer_empty(self): return not (self.buffer or self.position) - async def _is_buffer_readable(self): + async def _is_buffer_readable(self) -> bool: await self._refill_buffer() if self._is_buffer_empty(): return False