From fb764f6fc225135b4a1bd5ecb66ad6cc07872e70 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 9 Jan 2025 13:29:23 +0100 Subject: [PATCH] [pyarrow] Add skip_empty=True option to merging. --- law/contrib/pyarrow/util.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/law/contrib/pyarrow/util.py b/law/contrib/pyarrow/util.py index 94c835b1..05686ee9 100644 --- a/law/contrib/pyarrow/util.py +++ b/law/contrib/pyarrow/util.py @@ -26,6 +26,7 @@ def merge_parquet_files( callback: Callable[[int], Any] | None = None, writer_opts: dict[str, Any] | None = None, copy_single: bool = False, + skip_empty: bool = True, ) -> str: """ Merges parquet files in *src_paths* into a new file at *dst_path*. Intermediate directories are @@ -35,7 +36,8 @@ def merge_parquet_files( *callback* can refer to a callable accepting a single integer argument representing the index of the file after it was merged. *writer_opts* can be a dictionary of keyword arguments that are passed to the *ParquetWriter* instance. When *src_paths* contains only a single file and - *copy_single* is *True*, the file is copied to *dst_path* and no merging takes place. + *copy_single* is *True*, the file is copied to *dst_path* and no merging takes place. Files + containing empty tables are skipped unless *skip_empty* is *False*. The absolute, expanded *dst_path* is returned. """ @@ -81,7 +83,9 @@ def merge_parquet_files( # write the remaining ones for i, path in enumerate(src_paths[1:], 1): - writer.write_table(pq.read_table(path)) + _table = pq.read_table(path) + if not skip_empty or _table.num_rows > 0: + writer.write_table(_table) callback(i) return dst_path @@ -94,8 +98,7 @@ def merge_parquet_task( local: bool = False, cwd: str | pathlib.Path | LocalDirectoryTarget | None = None, force: bool = True, - writer_opts: dict[str, Any] | None = None, - copy_single: bool = False, + **kwargs: Any, ) -> None: """ This method is intended to be used by tasks that are supposed to merge parquet files, e.g. when @@ -108,8 +111,8 @@ def merge_parquet_task( :py:meth:`law.Task.publish_step` methods. When *force* is *True*, any existing output file is overwritten. - *writer_opts* and *copy_single* are forwarded to :py:func:`merge_parquet_files` which is used - internally for the actual merging. + All additional *kwargs* are forwarded to :py:func:`merge_parquet_files` which is used internally + for the actual merging. """ abspath = lambda p: os.path.abspath(os.path.expandvars(os.path.expanduser(get_path(p)))) @@ -130,12 +133,7 @@ def merge(inputs, output): output.remove() # merge - merge_parquet_files( - [inp.abspath for inp in inputs], - output.abspath, - writer_opts=writer_opts, - copy_single=copy_single, - ) + merge_parquet_files([inp.abspath for inp in inputs], output.abspath, **kwargs) stat = output.exists(stat=True) if not stat: