Skip to content

Commit

Permalink
Change filename column name to file_name (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
praateekmahajan authored Dec 24, 2024
1 parent b8ff71e commit 4fb7f54
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 70 deletions.
6 changes: 3 additions & 3 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def read_json(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
add_filename: Whether to add a "file_name" column to the DataFrame.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
Expand Down Expand Up @@ -102,7 +102,7 @@ def read_parquet(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
add_filename: Whether to add a "file_name" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
There is a significant performance gain when specifying columns for Parquet files.
Expand Down Expand Up @@ -135,7 +135,7 @@ def read_pickle(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
add_filename: Whether to add a "file_name" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
"""
Expand Down
4 changes: 2 additions & 2 deletions nemo_curator/datasets/parallel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def read_single_simple_bitext_file_pair(
tgt_lang (str): Target language, in ISO-639-1 (two character) format (e.g. 'en')
doc_id (str, optional): A string document id to assign to every segment in the file. Defaults to None.
backend (str, optional): Backend of the data frame. Defaults to "cudf".
add_filename (bool, optional): Add filename as an extra field to every segment in the file. Defaults to False.
add_filename (bool, optional): Add "file_name" as an extra field to every segment in the file. Defaults to False.
Returns:
Union[dd.DataFrame, dask_cudf.DataFrame]
Expand Down Expand Up @@ -162,6 +162,6 @@ def read_single_simple_bitext_file_pair(
df_combined["tgt_lang"] = tgt_lang

if add_filename:
df_combined["filename"] = remove_path_extension(src_input_file)
df_combined["file_name"] = remove_path_extension(src_input_file)

return df_combined
2 changes: 1 addition & 1 deletion nemo_curator/download/arxiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def download_arxiv(
"text": str,
"id": str,
"source_id": str,
"filename": str,
"file_name": str,
}
dataset = download_and_extract(
arxiv_urls,
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/download/commoncrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def download_common_crawl(
"url": str,
"warc_id": str,
"source_id": str,
"filename": str,
"file_name": str,
}
dataset = download_and_extract(
common_crawl_urls,
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/download/doc_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _download_and_extract_single_partition(
partition = pd.DataFrame(records)
filename = os.path.basename(output_path)
output_dir = os.path.dirname(output_path)
partition["filename"] = filename
partition["file_name"] = filename
single_partition_write_with_filename(partition, output_dir, output_type=output_type)
if not keep_raw_download:
os.remove(downloaded_file)
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/download/wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ def download_wikipedia(
"url": str,
"language": str,
"source_id": str,
"filename": str,
"file_name": str,
}
dataset = download_and_extract(
wikipedia_urls,
Expand Down
12 changes: 6 additions & 6 deletions nemo_curator/modules/dataset_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(
partition_to_filename: Callable[[int], str] = default_filename,
) -> None:
"""
Randomly permutes the dataset. This will make the original "filename" column invalid, so if the column is present it will be overwritten.
Randomly permutes the dataset. This will make the original "file_name" column invalid, so if the column is present it will be overwritten.
Args:
seed: The random seed that will be used to determine which partition (file) each datapoint goes to.
Setting the seed will guarantee determinism, but may be slightly slower (20-30% slower)
Expand Down Expand Up @@ -52,8 +52,8 @@ def shuffle_deterministic(self, dataset: DocumentDataset) -> DocumentDataset:
shuffled_df = dataset.df.set_index(self.rand_col, npartitions=new_npartitions)
shuffled_df = shuffled_df.reset_index(drop=True)

if "filename" in shuffled_df:
shuffled_df["filename"] = shuffled_df.map_partitions(self._add_filename)
if "file_name" in shuffled_df:
shuffled_df["file_name"] = shuffled_df.map_partitions(self._add_filename)

return DocumentDataset(shuffled_df)

Expand Down Expand Up @@ -98,15 +98,15 @@ def _partition_shuffle(self, partition, partition_info=None):
drop=True
)

if "filename" in partition:
if "file_name" in partition:
filename = self.partition_to_filename(partition_num)
partition["filename"] = filename
partition["file_name"] = filename

return partition

def _add_filename(self, partition, partition_info=None):
if partition_info is None:
return ["filename"] * len(partition)
return ["file_name"] * len(partition)

filename = self.partition_to_filename(partition_info["number"])

Expand Down
38 changes: 19 additions & 19 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ def select_columns(
) -> Union[dd.DataFrame, pd.DataFrame, "cudf.DataFrame"]:
# We exclude parquet because the parquet readers already support column selection
if filetype in ["jsonl", "json"] and columns is not None:
if add_filename and "filename" not in columns:
columns.append("filename")
if add_filename and "file_name" not in columns:
columns.append("file_name")
df = df[columns]

return df
Expand All @@ -299,12 +299,12 @@ def read_single_partition(
) -> Union["cudf.DataFrame", pd.DataFrame]:
"""
This function reads a file with cuDF, sorts the columns of the DataFrame
and adds a "filename" column.
and adds a "file_name" column.
Args:
files: The path to the jsonl files to read.
backend: The backend to use for reading the data. Either "cudf" or "pandas".
add_filename: Whether to add a "filename" column to the DataFrame.
add_filename: Whether to add a "file_name" column to the DataFrame.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
Expand Down Expand Up @@ -368,7 +368,7 @@ def read_single_partition(
for file in files:
df = read_f(file, **read_kwargs, **kwargs)
if add_filename:
df["filename"] = os.path.basename(file)
df["file_name"] = os.path.basename(file)
df = select_columns(df, io_columns, filetype, add_filename)
df_ls.append(df)

Expand Down Expand Up @@ -429,7 +429,7 @@ def extract_filename(path: str) -> str:

read_kwargs["include_path_column"] = add_filename
read_kwargs["path_converter"] = extract_filename
postprocessing_func = lambda df: df.rename(columns={"path": "filename"})
postprocessing_func = lambda df: df.rename(columns={"path": "file_name"})

elif file_type == "parquet":
if backend == "cudf" and not DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA:
Expand Down Expand Up @@ -509,7 +509,7 @@ def read_pandas_pickle(
Args:
file: The path to the pickle file to read.
add_filename: Whether to add a "filename" column to the DataFrame.
add_filename: Whether to add a "file_name" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
Returns:
A Pandas DataFrame.
Expand Down Expand Up @@ -543,7 +543,7 @@ def read_data(
file_type: The type of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
add_filename: Whether to add a "file_name" column to the DataFrame.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
Expand Down Expand Up @@ -686,14 +686,14 @@ def single_partition_write_with_filename(
Args:
df: A DataFrame.
output_file_dir: The output file path.
keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists.
keep_filename_column: Boolean representing whether to keep or drop the "file_name" column, if it exists.
output_type: The type of output file to write. Can be "jsonl" or "parquet".
Returns:
If the DataFrame is non-empty, return a Series containing a single element, True.
If the DataFrame is empty, return a Series containing a single element, False.
"""
assert "filename" in df.columns
assert "file_name" in df.columns

if len(df) > 0:
empty_partition = False
Expand All @@ -709,14 +709,14 @@ def single_partition_write_with_filename(
success_ser = pd.Series([empty_partition])

if not empty_partition:
filenames = df.filename.unique()
filenames = df.file_name.unique()
filenames = list(filenames.values_host) if is_cudf_type(df) else list(filenames)
num_files = len(filenames)

for filename in filenames:
out_df = df[df.filename == filename] if num_files > 1 else df
out_df = df[df.file_name == filename] if num_files > 1 else df
if not keep_filename_column:
out_df = out_df.drop("filename", axis=1)
out_df = out_df.drop("file_name", axis=1)

filename = (
Path(filename).stem if output_type != "bitext" else Path(filename).name
Expand Down Expand Up @@ -831,13 +831,13 @@ def write_to_disk(
"""
This function writes a Dask DataFrame to the specified file path.
If write_to_filename is True, then it expects the
DataFrame to have a "filename" column that specifies where to write the document.
DataFrame to have a "file_name" column that specifies where to write the document.
Args:
df: A Dask DataFrame.
output_path: The output file path.
write_to_filename: Boolean representing whether to write the filename using the "filename" column.
keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists.
write_to_filename: Boolean representing whether to write the filename using the "file_name" column.
keep_filename_column: Boolean representing whether to keep or drop the "file_name" column, if it exists.
output_type: The type of output file to write. Can be "jsonl" or "parquet".
"""
Expand All @@ -856,9 +856,9 @@ def write_to_disk(
)

# output_path is a directory
elif write_to_filename and "filename" not in df.columns:
elif write_to_filename and "file_name" not in df.columns:
raise ValueError(
"write_using_filename is True but no filename column found in DataFrame"
"write_using_filename is True but no file_name column found in DataFrame"
)

if is_cudf_type(df):
Expand Down Expand Up @@ -890,7 +890,7 @@ def write_to_disk(
os.makedirs(output_path, exist_ok=True)
tmp_output_file_dir = os.path.join(output_path, ".tmp")
os.makedirs(tmp_output_file_dir, exist_ok=True)
file_name = os.path.basename(list(df.filename.unique())[0])
file_name = os.path.basename(list(df.file_name.unique())[0])
else:
tmp_output_file_dir = os.path.join(output_path, ".tmp")
os.makedirs(tmp_output_file_dir, exist_ok=True)
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def separate_by_metadata(
Args:
input_data: Either a DataFrame or a string representing the path to the input directory.
If a DataFrame is provided, it must have a 'filename' column for the shard.
If a DataFrame is provided, it must have a "file_name" column for the shard.
output_dir: The base directory for which all metadata based subdirs will be created under
metadata_field: The metadata field to split on
remove_metadata: Whether to remove the metadata from the dataframe when saving it
Expand Down
18 changes: 9 additions & 9 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def test_multifile_single_partition(
keep_filename_column,
file_ext,
):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file0", "file1", "file1"]})
df = pd.DataFrame({"a": [1, 2, 3], "file_name": ["file0", "file1", "file1"]})

single_partition_write_with_filename(
df=df,
Expand All @@ -165,7 +165,7 @@ def test_multifile_single_partition(
assert os.path.exists(tmp_path / f"file1.{file_ext}")

if not keep_filename_column:
df = df.drop("filename", axis=1)
df = df.drop("file_name", axis=1)

df1 = read_single_partition(
files=[tmp_path / f"file0.{file_ext}"], backend="pandas", filetype=file_ext
Expand All @@ -185,7 +185,7 @@ def test_singlefile_single_partition(
keep_filename_column,
file_ext,
):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file2", "file2", "file2"]})
df = pd.DataFrame({"a": [1, 2, 3], "file_name": ["file2", "file2", "file2"]})

single_partition_write_with_filename(
df=df,
Expand All @@ -197,14 +197,14 @@ def test_singlefile_single_partition(
assert os.path.exists(tmp_path / f"file2.{file_ext}")

if not keep_filename_column:
df = df.drop("filename", axis=1)
df = df.drop("file_name", axis=1)
got = read_single_partition(
files=[tmp_path / f"file2.{file_ext}"], backend="pandas", filetype=file_ext
)
assert_eq(got, df)

def test_multifile_single_partition_error(self, tmp_path):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file0", "file1", "file1"]})
df = pd.DataFrame({"a": [1, 2, 3], "file_name": ["file0", "file1", "file1"]})

with pytest.raises(ValueError, match="Unknown output type"):
single_partition_write_with_filename(
Expand All @@ -220,13 +220,13 @@ def test_multifile_single_partition_error(self, tmp_path):
],
)
def test_multifile_multi_partition(self, tmp_path, file_ext, read_f):
df1 = pd.DataFrame({"a": [1, 2, 3], "filename": ["file1", "file2", "file2"]})
df1 = pd.DataFrame({"a": [1, 2, 3], "file_name": ["file1", "file2", "file2"]})
df2 = df1.copy()
df2["filename"] = "file3"
df2["file_name"] = "file3"
df3 = df1.copy()
df3["filename"] = ["file4", "file5", "file6"]
df3["file_name"] = ["file4", "file5", "file6"]
ddf = dd.concat([df1, df2, df3])
ddf["filename"] = ddf["filename"] + f".{file_ext}"
ddf["file_name"] = ddf["file_name"] + f".{file_ext}"
write_to_disk(
df=ddf,
output_path=tmp_path / file_ext,
Expand Down
18 changes: 6 additions & 12 deletions tests/test_read_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ def test_read_data_blocksize_add_filename_jsonl(mock_multiple_jsonl_files, backe
columns=None,
)

assert "filename" in df.columns
file_names = df["filename"].unique().compute()
assert "file_name" in df.columns
file_names = df["file_name"].unique().compute()
if backend == "cudf":
file_names = file_names.to_pandas()

Expand Down Expand Up @@ -340,13 +340,7 @@ def test_read_data_blocksize_add_filename_parquet(mock_multiple_parquet_files, b
pytest.param("cudf", "jsonl", marks=pytest.mark.gpu),
pytest.param("cudf", "parquet", marks=pytest.mark.gpu),
("pandas", "jsonl"),
pytest.param(
"pandas",
"parquet",
marks=pytest.mark.xfail(
reason="filename column inaccessible with pandas backend and parquet"
),
),
("pandas", "parquet"),
],
)
def test_read_data_fpp_add_filename(
Expand All @@ -369,8 +363,8 @@ def test_read_data_fpp_add_filename(
)

assert list(df.columns) == list(df.head().columns)
assert set(df.columns) == {"filename", "id", "text"}
file_names = df["filename"].unique().compute()
assert set(df.columns) == {"file_name", "id", "text"}
file_names = df["file_name"].unique().compute()
if backend == "cudf":
file_names = file_names.to_pandas()

Expand Down Expand Up @@ -445,7 +439,7 @@ def test_read_data_select_columns(
if not add_filename:
assert list(df.columns) == sorted(cols_to_select)
else:
assert list(df.columns) == sorted(cols_to_select + ["filename"])
assert list(df.columns) == sorted(cols_to_select + ["file_name"])


@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_separate_by_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _write_data(num_files, file_ext):
dfs = []
for i in range(num_files):
partition = df.copy()
partition["filename"] = f"f{i}.{file_ext}"
partition["file_name"] = f"f{i}.{file_ext}"
dfs.append(partition)

df = dd.concat(dfs)
Expand Down
Loading

0 comments on commit 4fb7f54

Please sign in to comment.