Skip to content

Commit

Permalink
[pyarrow] Add skip_empty=True option to merging.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 9, 2025
1 parent 31e255d commit fb764f6
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions law/contrib/pyarrow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def merge_parquet_files(
callback: Callable[[int], Any] | None = None,
writer_opts: dict[str, Any] | None = None,
copy_single: bool = False,
skip_empty: bool = True,
) -> str:
"""
Merges parquet files in *src_paths* into a new file at *dst_path*. Intermediate directories are
Expand All @@ -35,7 +36,8 @@ def merge_parquet_files(
*callback* can refer to a callable accepting a single integer argument representing the index of
the file after it was merged. *writer_opts* can be a dictionary of keyword arguments that are
passed to the *ParquetWriter* instance. When *src_paths* contains only a single file and
*copy_single* is *True*, the file is copied to *dst_path* and no merging takes place.
*copy_single* is *True*, the file is copied to *dst_path* and no merging takes place. Files
containing empty tables are skipped unless *skip_empty* is *False*.
The absolute, expanded *dst_path* is returned.
"""
Expand Down Expand Up @@ -81,7 +83,9 @@ def merge_parquet_files(

# write the remaining ones
for i, path in enumerate(src_paths[1:], 1):
writer.write_table(pq.read_table(path))
_table = pq.read_table(path)
if not skip_empty or _table.num_rows > 0:
writer.write_table(_table)
callback(i)

return dst_path
Expand All @@ -94,8 +98,7 @@ def merge_parquet_task(
local: bool = False,
cwd: str | pathlib.Path | LocalDirectoryTarget | None = None,
force: bool = True,
writer_opts: dict[str, Any] | None = None,
copy_single: bool = False,
**kwargs: Any,
) -> None:
"""
This method is intended to be used by tasks that are supposed to merge parquet files, e.g. when
Expand All @@ -108,8 +111,8 @@ def merge_parquet_task(
:py:meth:`law.Task.publish_step` methods. When *force* is *True*, any existing output file is
overwritten.
*writer_opts* and *copy_single* are forwarded to :py:func:`merge_parquet_files` which is used
internally for the actual merging.
All additional *kwargs* are forwarded to :py:func:`merge_parquet_files` which is used internally
for the actual merging.
"""
abspath = lambda p: os.path.abspath(os.path.expandvars(os.path.expanduser(get_path(p))))

Expand All @@ -130,12 +133,7 @@ def merge(inputs, output):
output.remove()

# merge
merge_parquet_files(
[inp.abspath for inp in inputs],
output.abspath,
writer_opts=writer_opts,
copy_single=copy_single,
)
merge_parquet_files([inp.abspath for inp in inputs], output.abspath, **kwargs)

stat = output.exists(stat=True)
if not stat:
Expand Down

0 comments on commit fb764f6

Please sign in to comment.