Skip to content

Commit

Permalink
Tidying up docs
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Kudinkin <[email protected]>
  • Loading branch information
alexeykudinkin committed Oct 28, 2024
1 parent 1230500 commit 2037198
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
20 changes: 17 additions & 3 deletions python/ray/data/_internal/output_buffer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any

import pyarrow

from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data.block import Block, BlockAccessor, DataBatch
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR
Expand Down Expand Up @@ -73,6 +75,11 @@ def next(self) -> Block:
block_to_yield = self._buffer.build()
block_remainder = None
block = BlockAccessor.for_block(block_to_yield)

# TODO remove
num_bytes_per_row = -1
target_num_rows = -1

if (
block.size_bytes()
>= MAX_SAFE_BLOCK_SIZE_FACTOR * self._target_max_block_size
Expand All @@ -89,15 +96,22 @@ def next(self) -> Block:
# into appropriately sized ones:
#
# - (Finalized) Target blocks sliced from the original one
# are *copied* to avoid holding up original one
# and are *copied* to avoid referencing original blocks
# - Temporary remainder of the block should *NOT* be copied
# such as to avoid repeatedly copying the remainder of the
# block, resulting in O(N^2) total bytes being copied
# such as to avoid repeatedly copying the remainder bytes
# of the block, resulting in O(M * N) total bytes being
# copied, where N is the total number of bytes in the original
# block and M is the number of blocks that will be produced by
# this iterator
block_to_yield = block.slice(0, target_num_rows, copy=True)
block_remainder = block.slice(
target_num_rows, block.num_rows(), copy=False
)

if isinstance(block_to_yield, pyarrow.Table):
print(f">>> [DBG] yielding block: {block_to_yield.num_rows}, remaining block: {block_remainder.num_rows if block_remainder else -1}; "
f"target num rows: {target_num_rows}, target max block size: {self._target_max_block_size}, num bytes per row: {num_bytes_per_row}")

self._buffer = DelegatingBlockBuilder()
if block_remainder is not None:
self._buffer.add_block(block_remainder)
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/tests/test_dynamic_block_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ def test_block_slicing(
),
override_num_blocks=num_tasks,
).materialize()

print(f">>> [DBG] test_block_slicing {ds._plan}")

assert ds._plan.initial_num_blocks() == expected_num_blocks

block_sizes = []
Expand Down

0 comments on commit 2037198

Please sign in to comment.