From 8693177aa68a17fc50347d9e6cf0960735ca6f3e Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 23 Jan 2025 14:00:14 -0800 Subject: [PATCH] fix tutorials Signed-off-by: Sarah Yurick --- tutorials/image-curation/image-curation.ipynb | 18 +- .../config/sem_dedup_config.yaml | 3 + tutorials/peft-curation-with-sdg/main.py | 4 +- .../red-pajama-v2-curation-tutorial.ipynb | 748 ++++++++++-------- ...pretraining-vietnamese-data-curation.ipynb | 12 +- .../single_gpu_tutorial.ipynb | 199 +++-- .../zyda2-tutorial/1_fuzzy_dedup/0_minhash.py | 3 +- .../zyda2-tutorial/1_fuzzy_dedup/1_lsh.py | 3 +- .../1_fuzzy_dedup/2_buckets_to_edges.py | 7 +- .../1_fuzzy_dedup/3_connected_components.py | 12 +- 10 files changed, 540 insertions(+), 469 deletions(-) diff --git a/tutorials/image-curation/image-curation.ipynb b/tutorials/image-curation/image-curation.ipynb index 5c7eb1b49..287c50511 100644 --- a/tutorials/image-curation/image-curation.ipynb +++ b/tutorials/image-curation/image-curation.ipynb @@ -623,7 +623,7 @@ } ], "source": [ - "from nemo_curator.cache import initialize_cache_directory\n", + "import os\n", "from nemo_curator.datasets import DocumentDataset\n", "from nemo_curator import ClusteringModel, SemanticClusterLevelDedup\n", "\n", @@ -631,14 +631,17 @@ "embeddings_dataset = DocumentDataset(dataset.metadata)\n", "\n", "semantic_dedup_outputs = \"./semantic_deduplication\"\n", - "initialize_cache_directory(semantic_dedup_outputs)\n", + "os.makedirs(semantic_dedup_outputs, exist_ok=True)\n", "\n", "# Run clustering\n", + "clustering_output = os.path.join(semantic_dedup_outputs, \"cluster_output\")\n", "clustering_model = ClusteringModel(\n", " id_column=id_col,\n", " embedding_col=\"image_embedding\",\n", " max_iter=10,\n", " n_clusters=1,\n", + " cache_dir=semantic_dedup_outputs,\n", + " clustering_save_loc=\"cluster_output\",\n", ")\n", "clustered_dataset = clustering_model(embeddings_dataset)" ] @@ -665,12 +668,18 @@ } ], "source": [ + "# Run cluster-level dedup\n", + "duplicate_output = os.path.join(semantic_dedup_outputs, \"duplicates\")\n", + "\n", "semantic_dedup = SemanticClusterLevelDedup(\n", " n_clusters=1,\n", " id_column=id_col,\n", " id_column_type=\"str\",\n", " embedding_col=\"image_embedding\",\n", " which_to_keep=\"hard\",\n", + " output_dir=duplicate_output,\n", + " cache_dir=semantic_dedup_outputs,\n", + " clustering_save_loc=\"cluster_output\",\n", ")\n", "semantic_dedup.compute_semantic_match_dfs([0.01, 0.001])\n", "deduplicated_dataset_ids = semantic_dedup.extract_dedup_data(eps_to_extract=0.01)" @@ -716,7 +725,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 15, "metadata": {}, "outputs": [ { @@ -815,9 +824,8 @@ "source": [ "import pandas as pd\n", "import os\n", - "from nemo_curator.cache import get_cache_directory\n", "\n", - "cluster_path = os.path.join(get_cache_directory(), \"clustering\", \"semdedup_pruning_tables\", \"cluster_0.parquet\")\n", + "cluster_path = os.path.join(duplicate_output, \"semdedup_pruning_tables\", \"cluster_0.parquet\")\n", "df = pd.read_parquet(cluster_path)\n", "df = df[~df[\"eps=0.001\"]]\n", "df = df.sort_values(\"cosine_sim_score\", ascending=False)\n", diff --git a/tutorials/peft-curation-with-sdg/config/sem_dedup_config.yaml b/tutorials/peft-curation-with-sdg/config/sem_dedup_config.yaml index ed32485c5..93ec29cb2 100644 --- a/tutorials/peft-curation-with-sdg/config/sem_dedup_config.yaml +++ b/tutorials/peft-curation-with-sdg/config/sem_dedup_config.yaml @@ -1,11 +1,14 @@ # Configuration file for semdantic dedup +cache_dir: "_temp/semdedup_cache" num_files: 16 # Embeddings configuration +embeddings_save_loc: "embeddings" embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2" embedding_batch_size: 128 # Clustering configuration +clustering_save_loc: "clustering_results" n_clusters: 20 seed: 1234 max_iter: 100 diff --git a/tutorials/peft-curation-with-sdg/main.py b/tutorials/peft-curation-with-sdg/main.py index 31ec992fd..a52a50c0e 100644 --- a/tutorials/peft-curation-with-sdg/main.py +++ b/tutorials/peft-curation-with-sdg/main.py @@ -26,7 +26,6 @@ from synthetic_gen import SyntheticGenerator from nemo_curator import AsyncOpenAIClient, ScoreFilter, Sequential -from nemo_curator.cache import initialize_cache_directory from nemo_curator.datasets import DocumentDataset from nemo_curator.filters import WordCountFilter from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter @@ -131,7 +130,7 @@ def semantic_dedupe(dataset): semdedup_config = SemDedupConfig.from_yaml( os.path.join(CONFIG_DIR, "sem_dedup_config.yaml") ) - initialize_cache_directory("_temp/semdedup_cache") + expand_outdir_and_mkdir(semdedup_config.cache_dir) semdup = SemDedup( config=semdedup_config, @@ -140,6 +139,7 @@ def semantic_dedupe(dataset): id_column_type="str", ) dedup_ids = semdup(dataset) + # When there are few duplicates we can compute the results to a list and use `isin`. result = dataset.df[dataset.df["id"].isin(dedup_ids.df["id"].compute())] return DocumentDataset(result) diff --git a/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb b/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb index 26ade710f..ae4adffd0 100644 --- a/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb +++ b/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb @@ -38,7 +38,7 @@ "- Quality filtering\n", "- Document-level deduplication\n", "\n", - "For this demonstration, we will use the [RedPajama-Data-v2](#rpv2) dataset, an open source dataset for LLM pretraining." + "For demonstration, we will use the [RedPajama-Data-v2](#rpv2) dataset, an open dataset for LLM pretraining." ] }, { @@ -50,8 +50,7 @@ }, "source": [ "## 1.1 System Information\n", - "\n", - "Here some is information about the system this notebook was run on:\n", + "Here is the information on the system this notebook was run on:\n", "\n", "- **GPU**: 2 A100 nodes (each with 8 A100-SXM4-80GB)\n", "\n", @@ -67,11 +66,12 @@ "\n", "- Set your Docker credentials:\n", "\n", + "\n", " `docker login nvcr.io`\n", "\n", " Username: `$oauthtoken`\n", " \n", - " Password: ``\n", + " Password: ``\n", " \n", "- Pull the NeMo Framework Container image\n", " \n", @@ -85,7 +85,7 @@ "id": "7d57dd35-cce6-4bfa-b34a-fb4a2ea584e0", "metadata": {}, "source": [ - "# 2. Getting Started\n", + "# 2. Getting started\n", "\n", "\n", "NeMo Curator uses Dask for parallelization. Before we start using NeMo Curator, we need to start a Dask cluster. To start a multi-node Dask cluster in Slurm, we can use the `start-distributed-notebook.sh` script in this directory. The user will need to change the following variables:\n", @@ -135,13 +135,13 @@ " write_to_disk,\n", ")\n", "\n", - "warnings.filterwarnings(\"ignore\")\n", + "warnings.filterwarnings('ignore')\n", "base_dir = \"/path/to/data\"" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "8d728cab-d3ad-41b8-aae8-6e6927f7edaf", "metadata": { "tags": [] @@ -156,7 +156,7 @@ } ], "source": [ - "scheduler_address = os.getenv(\"SCHEDULER_ADDRESS\")\n", + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", "cpu_client = get_client(scheduler_address=scheduler_address)\n", "print(f\"Num Workers = {get_num_workers(cpu_client)}\", flush=True)" ] @@ -166,7 +166,7 @@ "id": "bf008174-a7b6-4a62-b421-0e3d84e305f2", "metadata": {}, "source": [ - "# 3. RedPajama-Data-V2 Dataset\n", + "# 3. RedPajama-Data-v2\n", "" ] }, @@ -175,18 +175,18 @@ "id": "838d6014-a906-42dc-851e-d106d4db8d66", "metadata": {}, "source": [ - "RedPajama-V2 (rpv2) is an advanced open source initiative designed to support the development of large language models (LLMs). This dataset, sourced from 84 Common Crawl snapshots, spans five major languages (English, French, Spanish, German, and Italian), making it one of the largest and most comprehensive public datasets available for LLM training.\n", + "RedPajama-V2 (rpv2) is an advanced open-source initiative designed to support the development of large language models (LLMs). This dataset, sourced from 84 CommonCrawl snapshots, spans five major languages—English, French, Spanish, German, and Italian—making it one of the largest and most comprehensive public datasets available for LLM training.\n", "\n", - "The RedPajama-V2 dataset is available on [Hugging Face](https://huggingface.co/datasets/togethercomputer/RedPajama-Data-V2).\n", + "The RedPajama-V2 dataset is available on [Huggingface](https://huggingface.co/datasets/togethercomputer/RedPajama-Data-V2).\n", "\n", - "For this tutorial, we will start with a single snapshot from rpv2, and then scale to multiple snapshots to demonstrate the pre-training data curation workflow.\n", + "For this tutorial, we will start with a single snapshot from rpv2 and then scale to multiple snapshots to demonstrate the pre-training data curation workflow.\n", "\n", - "The raw rpv2 data is stored in the compressed JSON format. We will first decompress the `json.gz` files and write them into JSONL files. For this, we will use a helper function `convert_json_gz_to_jsonl` in `helper.py`." + "The raw rpv2 data is stored in compressed json. We will first decompress the json.gz file and write them into jsonl files. For this, we will use a helper function `convert_json_gz_to_jsonl` in `helper.py`\n" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "192aafb0-524d-4a5e-85a0-a272f938d3b7", "metadata": { "tags": [] @@ -204,7 +204,7 @@ "from helper import convert_json_gz_to_jsonl\n", "\n", "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-raw\")\n", - "output_data_dir = os.path.join(base_dir, \"rpv2-2023-06\")\n", + "output_data_dir = os.path.join(base_dir,\"rpv2-2023-06\")\n", "\n", "t0 = time.time()\n", "convert_json_gz_to_jsonl(input_data_dir, output_data_dir)\n", @@ -216,12 +216,12 @@ "id": "629b19e7-c6df-4519-8f82-97798cf39457", "metadata": {}, "source": [ - "To get started, we can read the JSONL files into a `DocumentDataset`, which is the standard format for text datasets used in NeMo Curator. `DocumentDataset` is essentially a wrapper around a Dask DataFrame, and we can get the DataFrame by calling `input_dataset.df`:" + "To get started, we can read the jsonl files into a `DocumentDataset` which is the standard format for text dataset used in curator." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "id": "4ad35dcf-cd4b-4157-9bb2-14e90a8123fd", "metadata": { "tags": [] @@ -238,7 +238,26 @@ "source": [ "from nemo_curator.datasets import DocumentDataset\n", "\n", - "input_dataset = DocumentDataset.read_json(output_data_dir, add_filename=True)\n", + "input_dataset = DocumentDataset.read_json(output_data_dir, add_filename=True)" + ] + }, + { + "cell_type": "markdown", + "id": "b1152395-0928-4598-b36d-3a21cc221bf0", + "metadata": {}, + "source": [ + "`DocumentDataset` is essentially a wrapper around dask dataframe and we can get the dataframe by calling `input_dataset.df`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f00fef38-0ae3-494d-9027-766f5c80d883", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ "input_dataset.df.head()" ] }, @@ -287,9 +306,9 @@ "id": "2ea58d4a", "metadata": {}, "source": [ - "## 4.1 Data Resharding\n", + "## 4.1 Data resharding\n", "\n", - "The input text files have varying sizes, which leads to imbalanced partitions that could result in out of memory issues. Ideally, we want to make balanced text files of similar sizes. NeMo Curator offers utility functions to help reshard the text files into similar sizes." + "The input text files have varying sizes, which leads to imbalanced partitions that could result in out-of-memory issues. Ideally, we want to make balanced text files of similar sizes. Curator offers utility to reshard the text files to simiar sizes." ] }, { @@ -352,7 +371,7 @@ "source": [ "## 4.2 Add ID\n", "\n", - "We will assign a unique ID to each document in the dataset so that we can reference them." + "We will assign a unique ID for each document in the dataset so we can refrence them." ] }, { @@ -373,12 +392,12 @@ "id": "19380a4c-c612-49c3-9b91-02879f53dc65", "metadata": {}, "source": [ - "We will create an instance of NeMo Curator's `AddId` class and use it to add IDs to all documents in the dataset." + "We will create an instance of Curator's `AddId` class and use it to add ID for all documents in the dataset." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "id": "4f35cac5-f084-40cf-a2fd-7d232ddd5ff3", "metadata": { "tags": [] @@ -395,20 +414,18 @@ } ], "source": [ - "input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-resharded\")\n", + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-resharded\")\n", "input_dataset = DocumentDataset.read_json(input_data_dir, add_filename=True)\n", - "id_data_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-id\"))\n", + "id_data_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-id\"))\n", "\n", "t0 = time.time()\n", - "\n", - "# Specify add_id call function\n", + "# specify add_id function\n", "add_id = AddId(\n", " id_field=\"id\",\n", " id_prefix=\"rpv2-2023-06\",\n", ")\n", "id_dataset = add_id(input_dataset)\n", "id_dataset.to_json(id_data_dir, write_to_filename=True)\n", - "\n", "print(f\"Adding ID took :{time.time()-t0}\")" ] }, @@ -459,9 +476,9 @@ "source": [ "## 4.3 Language ID and Separation\n", "\n", - "Data curation usually includes steps that are language-specific (e.g., using language-tuned heuristics for quality filtering). NeMo Curator provides utilities to identify languages. The language identification is performed using fastText.\n", + "Data curation usually includes steps that are language specific (e.g. using language-tuned heuristics for quality filtering). NeMo Curator provides utilities to identify languages. The language identification is performed using fastText.\n", "\n", - "It is worth mentioning that even though a preliminary language identification has been performed on rpv2 and we started with an English-only dataset, fastText is more accurate and thus is used for a second pass." + "It is worth mentioning that even though a preliminary language identification has been performed on rpv2 and we started with English-only dataset, fastText is more accurate so it can be used for a second pass." ] }, { @@ -481,7 +498,7 @@ "language_output_path = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-language\"))\n", "language_data_output_path = expand_outdir_and_mkdir(os.path.join(language_output_path, \"data\"))\n", "\n", - "# FastText model path\n", + "# Fasttext model path\n", "model_path = language_output_path\n", "\n", "# Define key in output .jsonl files to store the language information\n", @@ -493,7 +510,7 @@ "id": "82249135-d542-497f-896b-68c19297f434", "metadata": {}, "source": [ - "Download the fastText model for language detection:" + "Download the fastText model for langague detection." ] }, { @@ -513,12 +530,12 @@ "id": "1ca0e327-4055-48a9-8432-0aa0c21fb2b5", "metadata": {}, "source": [ - "We will create an instance of NeMo Curator's `ScoreFilter` and use a helper function `separate_by_metadata` to separate the dataset into subfolders based on language." + "We will create an instance of Curator's `ScoreFilter` and use a helper function `separate_by_metadata` to separate the dataset into subfolders based on language." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "id": "a2684904-f73b-4a3b-a2f9-e4ce8725684c", "metadata": { "tags": [] @@ -537,32 +554,32 @@ "t0 = time.time()\n", "\n", "# Load dataset\n", - "id_data_dir = os.path.join(base_dir, \"rpv2-2023-06-id\")\n", + "id_data_dir = os.path.join(base_dir,\"rpv2-2023-06-id\")\n", "input_dataset = DocumentDataset.read_json(id_data_dir, add_filename=True)\n", "\n", - "# Define the language separation pipeline\n", - "lang_filter = FastTextLangId(os.path.join(model_path, \"lid.176.bin\"))\n", + "# Define Language separation pipeline\n", + "lang_filter = FastTextLangId(os.path.join(model_path,'lid.176.bin'))\n", "language_id_pipeline = ScoreFilter(\n", " lang_filter, \n", " score_field=language_field,\n", " text_field=\"raw_content\",\n", - " score_type=\"object\",\n", + " score_type='object'\n", ")\n", "filtered_dataset = language_id_pipeline(input_dataset)\n", "\n", - "# Drop the detailed classifier score\n", + "# drop the detailed classifier score\n", "filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(\n", - " lambda score: score[1],meta = (language_field, \"object\")\n", + " lambda score: score[1],meta = (language_field, 'object')\n", " )\n", "\n", - "# Split the dataset to corresponding language subfolders\n", + "# Split the dataset to corresponding language sub-folders\n", "language_stats = separate_by_metadata(\n", " filtered_dataset.df, \n", " language_data_output_path, \n", " metadata_field=language_field\n", ").compute()\n", "\n", - "print(f\"Time taken for splitting language: {time.time()-t0}\")" + "print(f\"Time taken for splitting language:{time.time()-t0}\")" ] }, { @@ -570,12 +587,12 @@ "id": "8a192a66-3886-41b4-8398-343ed58aa64c", "metadata": {}, "source": [ - "The English dataset has 1,088,311,520 documents compared to 1,088,468,779 documents in the raw dataset. This is because the raw dataset already detected and filtered the English dataset." + "The English dataset has 1,088,311,520 documents compared to 1,088,468,779 documents in the raw dataset. This is because the raw dataset is aleady detected and filtered to English dataset." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "id": "07216ded-5a61-46cc-b201-d816fb68d3d9", "metadata": { "tags": [] @@ -600,7 +617,7 @@ } ], "source": [ - "en_dataset_path = os.path.join(base_dir, \"rpv2-2023-06-language/data/EN\")\n", + "en_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-language/data/EN\")\n", "en_dataset = DocumentDataset.read_json(en_dataset_path, add_filename=True)\n", "\n", "len(en_dataset)" @@ -635,7 +652,7 @@ }, "outputs": [], "source": [ - "ja_dataset_path = os.path.join(base_dir, \"rpv2-2023-06-language/data/JA\")\n", + "ja_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-language/data/JA\")\n", "ja_dataset = DocumentDataset.read_json(ja_dataset_path, add_filename=True)\n", "\n", "ja_dataset.df.head(1)" @@ -648,12 +665,12 @@ "source": [ "## 4.4 Text cleaning\n", "\n", - "Datasets may have improperly decoded unicode characters. NeMo Curator provides utilities to fix improperly decoded unicode characters based on the heuristics defined within the `ftfy` package." + "Datasets may have improperly decoded unicode characters. Curator provides utilities to fix improperly decoded unicode characters based on the heuristics defined within the `ftfy` package." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "bcc6c27b-0369-4fe4-83da-2d2c70fc7898", "metadata": { "tags": [] @@ -671,7 +688,7 @@ "import nemo_curator\n", "from nemo_curator.modifiers import UnicodeReformatter\n", "\n", - "en_dataset_path = os.path.join(base_dir, \"rpv2-2023-06-language/data/EN\")\n", + "en_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-language/data/EN\")\n", "en_dataset = DocumentDataset.read_json(en_dataset_path, add_filename=True)" ] }, @@ -680,29 +697,28 @@ "id": "b8775bb8-8ef8-4bd7-bdd3-7829cbd0b059", "metadata": {}, "source": [ - "NeMo Curator offers the `Modify` class to use with the `UnicodeReformatter` for text cleaning. It requires the following arguments:" + "Curator offers uses the `modify` method with `UnicodeReformatter` for text cleaning. It requires the following arguments:" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "id": "3a6db96d-03fa-4085-a669-7170e4a3de6b", "metadata": { "tags": [] }, "outputs": [], "source": [ - "# Make directory for cleaned dataset\n", - "output_clean_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\"))\n", - "\n", - "# Specify text field name and file type\n", + "# make directory for cleaned dataset\n", + "output_clean_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\"))\n", + "# specify text field name and file type\n", "input_text_field = \"raw_content\"\n", "input_file_type = \"jsonl\"" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "47d8a85d-ab50-4917-9cf1-4dfa12bc1a35", "metadata": { "tags": [] @@ -719,17 +735,15 @@ ], "source": [ "t0 = time.time()\n", - "\n", - "# Specify cleaner\n", + "# specify clearner\n", "cleaner = nemo_curator.Modify(\n", " UnicodeReformatter(), \n", " text_field=input_text_field\n", ")\n", "\n", - "# Clean dataset and write to disk\n", + "# clean dataset and write to disk\n", "cleaned_dataset = cleaner(en_dataset)\n", "cleaned_dataset.to_json(output_clean_dir, write_to_filename=True)\n", - "\n", "print(f\"Text cleaning took {time.time()-t0} s\")" ] }, @@ -776,7 +790,7 @@ "id": "0b8120a1-100e-4527-b642-00e8ef298f1b", "metadata": {}, "source": [ - "Exact deduplication works by computing a hash value for the raw text of each document. Documents with the same hash value are considered exact duplicates and will be removed. NeMo Curator provides GPU-accelerated exact deduplication using NVIDIA RAPIDS." + "Exact dedup computes a hash for the raw text of each document. Documents with the same hash value will be exact duplicates and will be removed. Curator provides GPU-accelerated exact deduplication using Rapids." ] }, { @@ -796,7 +810,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "6532bd31-af79-4d1e-bfb3-5bf432f55ae5", "metadata": { "tags": [] @@ -812,7 +826,7 @@ } ], "source": [ - "scheduler_address = os.getenv(\"SCHEDULER_ADDRESS\")\n", + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", "gpu_client = get_client(scheduler_address=scheduler_address)\n", "print(f\"Num Workers = {get_num_workers(gpu_client)}\", flush=True)\n", "\n", @@ -822,37 +836,24 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "0e9af962-b0d7-4012-a05a-c8287f8e62d7", "metadata": { "tags": [] }, "outputs": [], "source": [ - "cleaned_dataset_path = os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\")\n", + "cleaned_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\")\n", "log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"logs\"))\n", - "input_id_field = \"id\"\n", - "input_text_field = \"raw_content\"\n", - "hash_method = \"md5\"\n", - "output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-exact-dedup\"))" + "input_id_field = 'id'\n", + "input_text_field = 'raw_content'\n", + "hash_method = 'md5'\n", + "output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-exact-dedup\"))" ] }, { "cell_type": "code", - "execution_count": null, - "id": "1d8bb155", - "metadata": {}, - "outputs": [], - "source": [ - "from nemo_curator.cache import initialize_cache_directory\n", - "\n", - "# Initialize cache directory for results\n", - "initialize_cache_directory(output_dir)" - ] - }, - { - "cell_type": "code", - "execution_count": null, + "execution_count": 7, "id": "2d139334-1db2-41db-910f-b2a478824e04", "metadata": { "tags": [] @@ -869,20 +870,19 @@ ], "source": [ "t0 = time.time()\n", + "# Read the input dataset from the cleaned dataset dir\n", + "input_dataset = DocumentDataset.read_json(cleaned_dataset_path, backend='cudf')\n", "\n", - "# Read the input dataset from the cleaned dataset directory\n", - "input_dataset = DocumentDataset.read_json(cleaned_dataset_path, backend=\"cudf\")\n", - "\n", - "# Perform exact deduplication\n", + "# Perform exact dedup\n", "exact_dups = ExactDuplicates(\n", " logger=log_dir,\n", " id_field=input_id_field,\n", " text_field=input_text_field,\n", " hash_method=hash_method,\n", + " cache_dir=output_dir,\n", ")\n", "duplicates = exact_dups(dataset=input_dataset)\n", - "\n", - "print(f\"Exact dedup took: {time.time()-t0} s\")" + "print(f\"Exact dedup took:{time.time()-t0}\")\n" ] }, { @@ -892,7 +892,7 @@ "tags": [] }, "source": [ - "Exact deduplication found 97,327,867 duplicated documents:" + "Exact deduplication found 97,327,867 duplicated documents." ] }, { @@ -920,7 +920,7 @@ "id": "ddca7f66-4020-4f37-a8dc-200517dde329", "metadata": {}, "source": [ - "Let's see the results of exact deduplication:" + "Let's see the results of exact dedup:" ] }, { @@ -1010,26 +1010,96 @@ "id": "c3df3643", "metadata": {}, "source": [ - "We can sort the duplicate cluster by size and see that the largest cluster has 1,819 exact duplicates:" + "We can sort the duplicate cluster by size and see that the largest cluster has 1,819 exact duplicates." ] }, { "cell_type": "code", - "execution_count": null, - "id": "afa9c37e", - "metadata": {}, - "outputs": [], + "execution_count": 17, + "id": "4ac54290-dc1d-4f60-b768-f162d338ca47", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
count
_hashes
b7ba44a047ca570585d182d28d1e6bf81819
0469bde3868757d92af369c59992b9d91785
bdc1e82cba718a4717c683bf6a5541bd1784
f14149344e6519beaac2590b0535d2671771
f88eb7064d8e73c081af0731ba73c4511765
\n", + "
" + ], + "text/plain": [ + " count\n", + "_hashes \n", + "b7ba44a047ca570585d182d28d1e6bf8 1819\n", + "0469bde3868757d92af369c59992b9d9 1785\n", + "bdc1e82cba718a4717c683bf6a5541bd 1784\n", + "f14149344e6519beaac2590b0535d267 1771\n", + "f88eb7064d8e73c081af0731ba73c451 1765" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "duplicates_df.groupby(\"_hashes\") \\\n", - " .agg({\"id\": \"count\"}) \\\n", - " .rename(columns={\"id\": \"count\"}) \\\n", - " .sort_values(\"count\", ascending=False) \\\n", + "duplicates_df.groupby('_hashes') \\\n", + " .agg({'id': 'count'}) \\\n", + " .rename(columns={'id': 'count'}) \\\n", + " .sort_values('count', ascending=False) \\\n", " .head()" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 13, "id": "8b38f5a6-6f48-4081-a717-fb9a1b5e9539", "metadata": { "tags": [] @@ -1105,7 +1175,7 @@ } ], "source": [ - "dup_group = duplicates_df[duplicates_df[\"_hashes\"] == \"b7ba44a047ca570585d182d28d1e6bf8\"].compute()\n", + "dup_group = duplicates_df[duplicates_df['_hashes'] == 'b7ba44a047ca570585d182d28d1e6bf8'].compute()\n", "dup_group.head()" ] }, @@ -1114,12 +1184,12 @@ "id": "5cc83333-b19b-4335-92ef-6bcc29f3d7bf", "metadata": {}, "source": [ - "[Optional] Verify that documents with the same hash are exactly the same. We can use the IDs from the cell output above (IDs may change so revise the `dup_ids` as needed):" + "[Optional] Verify if the documents with the same hash are exactly the same. We can use the ids from the cell output above (ids may change so revise the `dup_ids` as needed):" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 16, "id": "ab1a6018-dead-4d22-b496-87b5afe56e7a", "metadata": { "tags": [] @@ -1135,8 +1205,8 @@ ], "source": [ "t0 = time.time()\n", - "dup_ids = [\"rpv2-2023-06-0962900660\", \"rpv2-2023-06-2417100276\", \"rpv2-2023-06-2936200328\"] \n", - "dup_examples = input_dataset.df[input_dataset.df[\"id\"].isin(dup_ids)].compute()\n", + "dup_ids = ['rpv2-2023-06-0962900660', 'rpv2-2023-06-2417100276', 'rpv2-2023-06-2936200328'] \n", + "dup_examples = input_dataset.df[input_dataset.df['id'].isin(dup_ids)].compute()\n", "print(f\"Searching for example duplicates with specific IDs took {time.time()-t0} seconds\")" ] }, @@ -1161,9 +1231,9 @@ }, "outputs": [], "source": [ - "print(\"Example duplicate 1\\n\" + dup_examples.raw_content.iloc[0])\n", - "print(\"\\n\\nExample duplicate 2\\n\" + dup_examples.raw_content.iloc[1])\n", - "print(\"\\n\\nExample duplicate 3\\n\" + dup_examples.raw_content.iloc[2])" + "print('Example duplicate 1\\n' + dup_examples.raw_content.iloc[0])\n", + "print('\\n\\nExample duplicate 2\\n' + dup_examples.raw_content.iloc[1])\n", + "print('\\n\\nExample duplicate 3\\n' + dup_examples.raw_content.iloc[2])" ] }, { @@ -1176,7 +1246,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "id": "49aae6f4-bfea-479e-ba3b-dbbc3321ae87", "metadata": { "tags": [] @@ -1192,14 +1262,14 @@ } ], "source": [ - "input_dataset = DocumentDataset.read_json(cleaned_dataset_path, add_filename=True, backend=\"cudf\")\n", - "duplicates = DocumentDataset.read_parquet(os.path.join(output_dir, \"_exact_duplicates.parquet\"), backend=\"cudf\")\n", + "input_dataset = DocumentDataset.read_json(cleaned_dataset_path, add_filename=True, backend='cudf')\n", + "duplicates = DocumentDataset.read_parquet(os.path.join(output_dir,\"_exact_duplicates.parquet\"), backend='cudf')\n", "duplicates_df = duplicates.df" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 19, "id": "01d8c63a-fa8c-4b26-9978-d162fef8bd2b", "metadata": { "tags": [] @@ -1215,14 +1285,14 @@ } ], "source": [ - "output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-exact-dup-removed\"))\n", + "output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-exact-dup-removed\"))\n", "\n", "t0 = time.time()\n", "docs_to_remove = duplicates_df.map_partitions(\n", " lambda x: x[x._hashes.duplicated(keep=\"first\")]\n", ")\n", "\n", - "# When there are few duplicates, we can compute the results to a list and use `isin`\n", + "# When there are few duplicates we can compute the results to a list and use `isin`.\n", "result = input_dataset.df[\n", " ~input_dataset.df[input_id_field].isin(\n", " docs_to_remove[input_id_field].compute()\n", @@ -1236,7 +1306,7 @@ " output_type='jsonl',\n", ")\n", "\n", - "print(f\"Removing exact duplicates took: {time.time()-t0} s\")" + "print(f\"Removing exact duplicates took:{time.time()-t0}\")" ] }, { @@ -1244,7 +1314,7 @@ "id": "f6ad08cf-f114-4665-b03a-1778347ae636", "metadata": {}, "source": [ - "We can see that exact deduplication pipeline removed 70,675,782 documents, and we now have 1,017,635,738 documents left in the dataset." + "We can see that exact dedup removed 70,675,782 documents and we now have 1,017,635,738 documents left in the dataset." ] }, { @@ -1300,21 +1370,21 @@ "source": [ "## 5.2 Fuzzy Deduplication\n", "\n", - "Fuzzy deduplication aims to find near-duplicated documents in our dataset. Near-duplicated documents are common in web crawled data due to plagiarism and mirror sites. Removing them can help improve the quality of trained models. In many cases, we can skip exact deduplication and just perform fuzzy deduplication, as it will also find the exact duplicates. Thus, we will start with the cleaned dataset for fuzzy deduplication.\n", + "Fuzzy deduplication aims to find near-duplicated documents in our dataset. Near-duplicated documents are common in web crawl data due to plagiarism and mirror sites. Removing them can help improve the quality of trained models. In many cases, we can skip exact dedup and just perform fuzzy dedup as it will also find the exact duplicates. Thus, we will start with the cleaned dataset for fuzzy dedup.\n", "\n", - "NeMo Curator implements GPU-accelerated fuzzy deduplication by using a minhash and locality-sensitive hashing algorithm for finding similar documents across the dataset. Specifically, fuzzy deduplication includes six steps:\n", + "Curator implements GPU-accelerated Fuzzy Deduplication based on minhash + LSH algorithm for finding similar documents across the dataset. Specifically, Fuzzy Deduplication include six steps:\n", "\n", - "- Compute Minhashes\n", + "- Compute minhashes\n", "- Locality-Sensitive Hashing (LSH)\n", - "- Map Buckets\n", - "- Jaccard Shuffle\n", - "- Jaccard Compute\n", - "- Connected Components" + "- Map buckets\n", + "- Jaccard shuffle\n", + "- Jaccard compute\n", + "- Connected components\n" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "750b1c02-2b37-474f-aaa2-2de86ac3a9e7", "metadata": { "tags": [] @@ -1333,7 +1403,7 @@ "def pre_imports():\n", " import cudf # noqa: F401\n", "\n", - "scheduler_address = os.getenv(\"SCHEDULER_ADDRESS\")\n", + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", "gpu_client = get_client(scheduler_address=scheduler_address)\n", "print(f\"Num Workers = {get_num_workers(gpu_client)}\", flush=True)\n", "\n", @@ -1346,14 +1416,14 @@ "id": "eb279812-e11f-4e5e-bc38-cd4189201be9", "metadata": {}, "source": [ - "### 5.2.1 Compute Minhashes\n", + "### 5.2.1 Compute minhashes\n", "\n", - "First, we will compute a minhash signature for each document. For this, each document will be represented by a set of n-grams. We will apply random hash functions on each element of the set. The minimum hash value generated by each hash function will be recorded and become a component of the minhash signature. Thus, the length of the minhash signature will be the same as the number of hash functions." + "First, we will compute the minhash signature for each documents. For this purpose, each document will be represented by a set of n-grams. We will apply random hash functions on each element of the set. The minimum hash value generated by each hash function will be recorded and becomes a component of the MinHash signature. Thus, the length of the minhash signature will be the same as the number of hash functions. " ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "id": "8ad04e90-a1b6-4881-a29a-dc0c63b026bc", "metadata": { "tags": [] @@ -1361,18 +1431,15 @@ "outputs": [], "source": [ "from nemo_curator import MinHash\n", - "from nemo_curator.cache import initialize_cache_directory\n", "\n", - "input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\")\n", + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\")\n", "seed = 42\n", "minhash_length = 260\n", "char_ngram = 5\n", "log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"logs\"))\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"\n", - "\n", - "minhash_output_dir = os.path.join(base_dir,\"rpv2-2023-06-minhash\")\n", - "initialize_cache_directory(minhash_output_dir)" + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "minshah_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-minhash\"))" ] }, { @@ -1405,7 +1472,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 33, "id": "897b88bf-f49f-46a0-b19d-c3688cb056f2", "metadata": { "tags": [] @@ -1422,7 +1489,7 @@ "source": [ "t0 = time.time()\n", "\n", - "# Run MinHash on input data\n", + "# Run MinHash() on input data\n", "minhasher = MinHash(\n", " seed=seed,\n", " num_hashes=minhash_length,\n", @@ -1431,10 +1498,12 @@ " logger=log_dir,\n", " id_field=id_field,\n", " text_field=text_field,\n", + " cache_dir=minshah_output_dir\n", ")\n", + "\n", "result = minhasher(DocumentDataset(df)).df\n", "\n", - "print(f\"Computing minhashes took:{time.time()-t0} s\")" + "print(f\"Computing minhashes took:{time.time()-t0}\")" ] }, { @@ -1442,7 +1511,7 @@ "id": "8a198426-29d2-4e60-9468-e2839d634d18", "metadata": {}, "source": [ - "We can see some example outputs from the minhash computation:" + "We can see some example outputs from the minhash computation." ] }, { @@ -1539,18 +1608,18 @@ "id": "dcc1c210-2670-48b7-90d7-8105e274e1a8", "metadata": {}, "source": [ - "The `LSH` class implements an algorithm which includes the following steps:\n", + "LSH() implements LSH algorithm which includes the following steps:\n", "\n", "- Divide the minhash signature array into X different portions.\n", "\n", - "- For each portion, hash the minhash values into buckets. One document will be assigned to X buckets.\n", + "- For each portions, hash the minhash values into buckets. One document will be assigned to X buckets.\n", "\n", - "- Documents within the same bucket will be deemed similar. Since every document will be assigned X buckets and as long as two documents share 1 or more buckets they are deemed similar, the result of LSH will have more false positive predictions as compared to false negative predictions. The false positive cases will be filtered in following modules, namely the Jaccard compute step." + "- Documents within the same bucket will be deemed similar. Since every document will be assigned X buckets and as long as two documents share 1 or more buckets they are deemed similar, the result of LSH will have more false positive as compared to false negative. The false positive cases will be filtered in following modules, namely jaccard compute." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 14, "id": "89b62629-6d54-43ce-8995-49a89d89859f", "metadata": { "tags": [] @@ -1560,21 +1629,19 @@ "from nemo_curator import LSH\n", "from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int\n", "\n", - "lsh_input_dir = os.path.join(base_dir, \"rpv2-2023-06-minhash\")\n", - "id_field = \"id\"\n", + "lsh_input_dir = os.path.join(base_dir,\"rpv2-2023-06-minhash\")\n", + "id_field = 'id'\n", + "output_bucket_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06\"))\n", "num_bands = 20\n", "buckets_per_shuffle = 1\n", - "minhash_field = \"_minhash_signature\"\n", + "minhash_field = '_minhash_signature'\n", "minhash_length = 260\n", - "log_dir = os.path.join(base_dir, \"logs\")\n", - "\n", - "output_bucket_dir = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06\")\n", - "initialize_cache_directory(output_bucket_dir)" + "log_dir = os.path.join(base_dir, \"logs\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "7ccb7591-e5ef-482a-b226-a612641f90d0", "metadata": { "tags": [] @@ -1591,7 +1658,7 @@ "source": [ "t0 = time.time()\n", "\n", - "# Load MinHash output\n", + "#Load MinHash output\n", "df = dask_cudf.read_parquet(lsh_input_dir, blocksize=\"2GB\", aggregate_files=True)\n", "df = df.map_partitions(\n", " convert_str_id_to_int,\n", @@ -1602,6 +1669,7 @@ ")\n", "\n", "lsh = LSH(\n", + " cache_dir=output_bucket_dir,\n", " num_hashes=minhash_length,\n", " num_buckets=num_bands,\n", " buckets_per_shuffle=buckets_per_shuffle,\n", @@ -1708,7 +1776,8 @@ "source": [ "### 5.2.3 Map Buckets\n", "\n", - "After performing LSH, we need to process each bucket and calculate an approximation of the Jaccard similarity scores between all pairs in order to remove false positive duplicates introduced by LSH. For this purpose, we will randomly sample N \"anchor\" documents within each bucket and calculate its Jaccard similarity scores with every document remaining in the bucket." + "After performing LSH, we processed each bucket and calculated an approximation of the all-pairs Jaccard\n", + "similarity in order to remove false positive duplicates introduced by LSH. For this purpose, we will randomly sample n \"anchor\" documents within each buckets and calculate the Jaccard similarity with everything remaining in the bucket." ] }, { @@ -1726,21 +1795,21 @@ " get_text_ddf_from_json_path_with_blocksize,\n", ")\n", "\n", - "input_data_paths = [os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\")]\n", + "input_data_paths = [os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\")]\n", "num_files = None\n", - "text_ddf_blocksize = 256 # The block size for chunking jsonl files for the ddf in MB\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"\n", - "input_bucket_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/_buckets.parquet\")\n", - "input_bucket_field = \"_bucket_id\"\n", - "shuffle_type = \"tasks\"\n", + "text_ddf_blocksize = 256 #The block size for chunking jsonl files for text ddf in mb\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "input_bucket_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06/_buckets.parquet\")\n", + "input_bucket_field = '_bucket_id'\n", + "shuffle_type ='tasks'\n", "log_dir = os.path.join(base_dir, \"logs\")\n", - "output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/anchor_docs_with_bk.parquet\"))" + "output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06/anchor_docs_with_bk.parquet\"))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 12, "id": "f94be7c9-27ea-4f1f-8a8f-05a866eafac3", "metadata": { "tags": [] @@ -1756,7 +1825,7 @@ } ], "source": [ - "# Read jsonl input data\n", + "# Read .jsonl input data\n", "ddf_text = get_text_ddf_from_json_path_with_blocksize(\n", " input_data_paths=input_data_paths,\n", " num_files=num_files,\n", @@ -1765,12 +1834,12 @@ " text_column=text_field,\n", ")\n", "\n", - "print(f\"ddf_text.npartitions = {ddf_text.npartitions}\", flush=True)" + "print(f\"ddf_text.npartitions = {ddf_text.npartitions}\", flush=True)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 15, "id": "a0700327-5171-4673-8ded-c9f43492f582", "metadata": { "tags": [] @@ -1789,13 +1858,13 @@ "t0 = time.time()\n", "num_workers = get_num_workers(gpu_client)\n", "\n", - "# Read _buckets.parquet\n", + "# Read \"_buckets.parquet\"\n", "ddf_bk = get_bucket_ddf_from_parquet_path(\n", " input_bucket_path=input_bucket_path, \n", " num_workers=num_workers\n", ")\n", "\n", - "# Run _MapBuckets\n", + "#Run _MapBuckets()\n", "map_buckets = _MapBuckets(\n", " id_fields=[\"dataset_id\", \"doc_id\"], \n", " bucket_field=input_bucket_field, \n", @@ -1809,13 +1878,13 @@ " shuffle_type=shuffle_type\n", ")\n", "\n", - "# Write to disk\n", + "#Write to disk\n", "ddf_anchor_docs_with_bk.to_parquet(\n", " output_anchor_docs_with_bk_path, \n", " write_index=False\n", ")\n", "\n", - "print(f\"Mapping buckets took {time.time()-t0} s\")" + "print(f\"Mapping Bucket took {time.time()-t0} s\")" ] }, { @@ -1943,7 +2012,7 @@ "source": [ "### 5.2.4 Jaccard Shuffle\n", "\n", - "Next, we shuffle the documents within the dataset based on their bucket assignments, essentially distributing similar documents across different partitions or workers, enabling efficient parallel processing and deduplication in subsequent steps." + "We shuffle the documents within the dataset based on their bucket assignments, essentially distributing similar documents across different partitions or workers, enabling efficient parallel processing and deduplication in subsequent steps." ] }, { @@ -1958,20 +2027,20 @@ "from nemo_curator.modules.fuzzy_dedup._shuffle import _Shuffle\n", "\n", "log_dir = os.path.join(base_dir, \"logs\")\n", - "input_anchor_docs_with_bk_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/anchor_docs_with_bk.parquet\")\n", + "input_anchor_docs_with_bk_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06/anchor_docs_with_bk.parquet\")\n", "output_shuffled_docs_path = expand_outdir_and_mkdir(\n", " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/shuffled_docs.parquet\")\n", ")\n", "bucket_mapping_ddf_blocksize = 256\n", "parts_per_worker = 16\n", "bucket_parts_per_worker = 256\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"" + "id_field = 'id'\n", + "text_field = 'raw_content'" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 24, "id": "07894e81-6cdc-4292-951a-977b220fbd81", "metadata": { "collapsed": true, @@ -2393,7 +2462,7 @@ " partition_on=\"_output_partition_id\",\n", ")\n", "\n", - "print(f\"Jaccard shuffle took {time.time()-t0} s\")" + "print(f\"Jaccard Shuffle took {time.time()-t0} s\")" ] }, { @@ -2401,7 +2470,7 @@ "id": "26739f23-47f1-4e11-ac49-82920f534495", "metadata": {}, "source": [ - "We can visualize the Jaccard shuffle results for a single partition:" + "We can visualize the jaccard shuffle results for a single partition:" ] }, { @@ -2413,7 +2482,7 @@ }, "outputs": [], "source": [ - "jaccard_shuffle_res = dd.read_parquet(os.path.join(output_shuffled_docs_path, \"_output_partition_id=0\"))\n", + "jaccard_shuffle_res = dd.read_parquet(os.path.join(output_shuffled_docs_path,\"_output_partition_id=0\"))\n", "jaccard_shuffle_res.head()" ] }, @@ -2424,7 +2493,7 @@ "source": [ "### 5.2.5 Jaccard Compute\n", "\n", - "Now that we have sampled the Jaccard pairs, we can compute the Jaccard similarity score for all pairs." + "Now we have the jaccard pairs sampled, we can compute the Jaccard similarity score for all pairs." ] }, { @@ -2438,8 +2507,8 @@ "source": [ "from nemo_curator import JaccardSimilarity\n", "\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", "ngram_size = 5\n", "shuffled_docs_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/shuffled_docs.parquet\")\n", "jaccard_results_path = expand_outdir_and_mkdir(\n", @@ -2449,7 +2518,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "d31a24fe-8229-48bc-89f7-32f2b93f4f5c", "metadata": { "tags": [] @@ -2481,7 +2550,7 @@ " write_metadata_file=False,\n", ")\n", "\n", - "print(f\"Jaccard computing and writing took {time.time() - t0} seconds\")" + "print(f\"Jaccard Computing+Writing took {time.time() - t0} seconds\")" ] }, { @@ -2577,9 +2646,11 @@ "id": "fb9e1287-bd19-4728-a4c8-b92b39ca1fcc", "metadata": {}, "source": [ - "### 5.2.6 Connected Components\n", + "### 5.2.6 Connected Component\n", "\n", - "After all buckets are processed and duplicates (at the threshold) are approximately discovered, we construct a sparse document graph and find the connected components therein. Each connected component represents a set of documents that we consider similar enough to be duplicates, and\n", + "After all buckets were processed and duplicates (at the threshold) were approximately discovered,\n", + "we constructed a sparse document graph and found the connected components therein (using scipy). Each\n", + "connected component represents a set of documents that we consider similar enough to be duplicates, and\n", "from which we select a single representative." ] }, @@ -2594,10 +2665,11 @@ "source": [ "from nemo_curator import ConnectedComponents\n", "\n", - "cache_dir = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06\")\n", - "initialize_cache_directory(cache_dir)\n", - "\n", - "id_field = \"id\"\n", + "cache_dir = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/cc-cache\")\n", + ")\n", + "jaccard_pairs_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/jaccard_similarity_results.parquet\")\n", + "id_field = 'id'\n", "jaccard_threshold = 0.8\n", "output_path = expand_outdir_and_mkdir(\n", " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/connected_components.parquet\")\n", @@ -2659,15 +2731,14 @@ ], "source": [ "t0 = time.time()\n", - "\n", "components_stage = ConnectedComponents(\n", + " cache_dir=cache_dir,\n", + " jaccard_pairs_path=jaccard_pairs_path,\n", " id_column=id_field,\n", " jaccard_threshold=jaccard_threshold,\n", - " false_positive_check=True,\n", ")\n", "components_stage.cc_workflow(output_path=output_path)\n", - "\n", - "print(f\"Connected components took {time.time()-t0} seconds\")" + "print(f\"Connected Component took {time.time()-t0} seconds\")" ] }, { @@ -2675,12 +2746,12 @@ "id": "15214dcf-ff49-439e-b3d7-d8666d081027", "metadata": {}, "source": [ - "Let's check the results of connected components step. We can see that 239,037,733 documents were identified as duplicates to be removed." + "Let's check the results of connected components step. We can see that 239,037,733 are identified as duplicates to be removed." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "id": "94e8126d-af15-4182-98cd-10df06e9778e", "metadata": {}, "outputs": [ @@ -2696,14 +2767,14 @@ "output_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/connected_components.parquet\")\n", "cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)\n", "\n", - "# Set \"group\" as the index and shuffle to ensure all same \"group\" values are in the same partition\n", - "cc_result = cc_result.set_index(\"group\", shuffle=\"tasks\")\n", + "# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition\n", + "cc_result = cc_result.set_index('group', shuffle='tasks')\n", "\n", "# Define a function to assign cumulative counts and filter duplicates\n", "def assign_cumcount(df):\n", - " df[\"cumcount\"] = df.groupby(level=0).cumcount()\n", - " df = df[df[\"cumcount\"] >= 1]\n", - " df = df.drop(columns=[\"cumcount\"])\n", + " df['cumcount'] = df.groupby(level=0).cumcount()\n", + " df = df[df['cumcount'] >= 1]\n", + " df = df.drop(columns=['cumcount'])\n", " return df\n", "\n", "# Find duplicates by applying the function to each partition\n", @@ -2713,14 +2784,12 @@ "docs_to_remove = docs_to_remove.reset_index()\n", "\n", "docs_to_remove = docs_to_remove[[\"dataset_id\", \"doc_id\"]]\n", - "docs_to_remove = docs_to_remove.rename(\n", - " columns={\"dataset_id\": \"to_remove_dataset_id\", \"doc_id\": \"to_remove_doc_id\"}\n", - ")\n", + "docs_to_remove = docs_to_remove.rename(columns={\"dataset_id\":\"to_remove_dataset_id\", \"doc_id\":\"to_remove_doc_id\"})\n", "docs_to_remove = docs_to_remove.reset_index(drop=True).persist()\n", "_ = wait(docs_to_remove)\n", "del _ \n", "\n", - "print(\"Num of docs to remove =\", len(docs_to_remove))" + "print(\"num of docs to remove =\", len(docs_to_remove))" ] }, { @@ -2733,7 +2802,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "id": "cae7f166-836a-4c21-bff2-7453254956b7", "metadata": { "tags": [] @@ -2808,7 +2877,7 @@ } ], "source": [ - "cc_grouped = cc_result.groupby(\"group\").agg({\"doc_id\": \"count\"}).rename(columns={\"doc_id\": \"count\"}).sort_values(\"count\", ascending=False).compute()\n", + "cc_grouped = cc_result.groupby('group').agg({'doc_id': 'count'}).rename(columns={'doc_id': 'count'}).sort_values('count', ascending=False).compute()\n", "cc_grouped.head()" ] }, @@ -2817,7 +2886,7 @@ "id": "0def7323-3d2c-4861-9b7e-a1e296ccf329", "metadata": {}, "source": [ - "[Optional] Verify that fuzzy duplicates are similar. For example, we can look into the largest group \"350652173\":" + "[Optional] Verify if fuzzy duplicates are similar. For example, we can look into the largest group \"350652173\"." ] }, { @@ -2918,12 +2987,22 @@ }, { "cell_type": "code", - "execution_count": null, - "id": "8b7e3037", - "metadata": {}, - "outputs": [], + "execution_count": 4, + "id": "00cf923e-fd4e-41b9-a00f-801c186ac70e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n" + ] + } + ], "source": [ - "# Read input dataset\n", + "# read input dataset\n", "input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\")\n", "input_dataset = DocumentDataset.read_json(input_data_dir, add_filename=True)" ] @@ -2933,12 +3012,12 @@ "id": "9772bf71-9e18-4e59-b9f8-ebd9053c79b0", "metadata": {}, "source": [ - "Let's visualize the content of these documents and see if they are similar (IDs may change so revise the `dup_ids` as needed)." + "Let's visualize the content of these documents and see if they are similar (ids may change so revise the `dup_ids` as needed)." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "id": "e3cc167f-30f8-470d-99e3-0a2d916d46bf", "metadata": { "tags": [] @@ -2954,16 +3033,14 @@ ], "source": [ "t0 = time.time()\n", - "\n", "dup_ids = [\n", - " \"rpv2-2023-06-1285625132\",\n", - " \"rpv2-2023-06-2033200488\",\n", - " \"rpv2-2023-06-0428016172\",\n", - " \"rpv2-2023-06-1268721963\",\n", - " \"rpv2-2023-06-1285428574\",\n", + " 'rpv2-2023-06-1285625132',\n", + " 'rpv2-2023-06-2033200488',\n", + " 'rpv2-2023-06-0428016172',\n", + " 'rpv2-2023-06-1268721963',\n", + " 'rpv2-2023-06-1285428574'\n", "] \n", - "dup_examples = input_dataset.df[input_dataset.df[\"id\"].isin(dup_ids)].compute()\n", - "\n", + "dup_examples = input_dataset.df[input_dataset.df['id'].isin(dup_ids)].compute()\n", "print(f\"Searching for near duplicate examples with specific IDs took {time.time()-t0} seconds\")" ] }, @@ -2988,11 +3065,11 @@ }, "outputs": [], "source": [ - "print(\"Example duplicate 1\\n\" + dup_examples.raw_content.iloc[0])\n", - "print(\"\\n\\nExample duplicate 2\\n\" + dup_examples.raw_content.iloc[1])\n", - "print(\"\\n\\nExample duplicate 3\\n\" + dup_examples.raw_content.iloc[2])\n", - "print(\"\\n\\nExample duplicate 4\\n\" + dup_examples.raw_content.iloc[3])\n", - "print(\"\\n\\nExample duplicate 4\\n\" + dup_examples.raw_content.iloc[4])" + "print('Example duplicate 1\\n' + dup_examples.raw_content.iloc[0])\n", + "print('\\n\\nExample duplicate 2\\n' + dup_examples.raw_content.iloc[1])\n", + "print('\\n\\nExample duplicate 3\\n' + dup_examples.raw_content.iloc[2])\n", + "print('\\n\\nExample duplicate 4\\n' + dup_examples.raw_content.iloc[3])\n", + "print('\\n\\nExample duplicate 4\\n' + dup_examples.raw_content.iloc[4])" ] }, { @@ -3008,12 +3085,12 @@ "id": "f2e01b84-07cc-45a3-9dde-97884d1922a3", "metadata": {}, "source": [ - "Next, we will remove the duplicates identified from the dataset. We will first change the string ID to `doc_id` and `dataset_id` in the input dataset." + "Next, we will proceed to remove the duplicates identified from the dataset. We will first change the string ID to `doc_id` and `dataset_id` in the input dataset." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "7cde97c5-cfaa-4096-8d9f-1ffa19c4adb2", "metadata": { "tags": [] @@ -3031,10 +3108,10 @@ "from helper import convert_str_id_to_int\n", "\n", "input_dataset = DocumentDataset.read_json(os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\"), backend=\"cudf\")\n", - "input_df = input_dataset.df[[\"raw_content\", \"id\"]]\n", + "input_df = input_dataset.df[['raw_content','id']]\n", "meta = input_df._meta\n", - "meta[\"doc_id\"]=np.int64([0])\n", - "meta[\"dataset_id\"]=np.uint32([0])\n", + "meta['doc_id']=np.int64([0])\n", + "meta['dataset_id']=np.uint32([0])\n", "input_df = input_df.map_partitions(\n", " convert_str_id_to_int,\n", " id_column=\"id\",\n", @@ -3052,7 +3129,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "f819d1d3-c4c0-4288-b190-86276d221050", "metadata": { "tags": [] @@ -3069,15 +3146,15 @@ "source": [ "dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-deduped\"))\n", "deduped_df = input_df.merge(docs_to_remove,\n", - " left_on=[\"doc_id\", \"dataset_id\"],\n", + " left_on=['doc_id','dataset_id'],\n", " right_on=[\"to_remove_doc_id\", \"to_remove_dataset_id\"],\n", - " how=\"left\")\n", + " how='left')\n", "\n", - "deduped_df = deduped_df[deduped_df[\"to_remove_doc_id\"].isna()].drop(columns=[\"to_remove_doc_id\", \"to_remove_dataset_id\"]).reset_index(drop=True)\n", + "deduped_df = deduped_df[deduped_df['to_remove_doc_id'].isna()].drop(columns=['to_remove_doc_id', \"to_remove_dataset_id\"]).reset_index(drop=True)\n", "\n", "t0 = time.time()\n", "deduped_df.to_parquet(dedup_output_dir)\n", - "print(f\"Removing duplicates and writing deduplicated dataset took {time.time()-t0} seconds\")" + "print(f\"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds\")" ] }, { @@ -3139,7 +3216,7 @@ "id": "cdc6350e-2363-4b13-ac67-0cbc23ad981d", "metadata": {}, "source": [ - "## 5.3 Inter-Snapshot Deduplication" + "## 5.3 Inter-snapshot Deduplication" ] }, { @@ -3147,9 +3224,9 @@ "id": "888c2b15-961f-4a73-a0a3-15474ae4134c", "metadata": {}, "source": [ - "So far we have deduplicated a single snapshot from rpv2. Pre-training datasets include multiple snapshots, so we will often need to perform inter-snapshot deduplication. For this tutorial, we will demonstrate deduplication across two snapshots as an example.\n", + "So far we have deduplicated a single snapshot from rpv2. Pre-training dataet include multiple snapshots so we will often need to perform inter-snapshot deduplication. For this tutorial, we will demostrate deduplication across two snapshots as an example.\n", "\n", - "We will first perform all of the above steps for another snapshot `2023-14`, and then combine the two deduplicated datasets and store them as `rpv2-2023-06-and-14-deduped`.\n", + "We first performed all the above steps for another snapshot `2023-14` and then combined the two deduped datasets into one and stored them in `rpv2-2023-06-and-14-deduped`.\n", "\n", "Next, we will perform the fuzzy deduplication on the combined dataset." ] @@ -3178,7 +3255,6 @@ "from nemo_curator import ConnectedComponents\n", "from nemo_curator import JaccardSimilarity\n", "\n", - "from nemo_curator.cache import initialize_cache_directory\n", "from nemo_curator.utils.file_utils import reshard_jsonl\n", "from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int\n", "from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (\n", @@ -3189,7 +3265,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 23, "id": "efaed1ed-e6d1-4117-9b0b-fe0d20960b60", "metadata": { "tags": [] @@ -3200,28 +3276,26 @@ "minhash_length = 260\n", "char_ngram = 5\n", "log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"logs\"))\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"\n", - "\n", - "minhash_output_dir = os.path.join(base_dir, \"rpv2-2023-06-and-14-minhash\")\n", - "initialize_cache_directory(minhash_output_dir)" + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "minshah_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-and-14-minhash\"))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "id": "b4bf2d09-6601-4bd2-a6f2-f738cffd8885", "metadata": { "tags": [] }, "outputs": [], "source": [ - "input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped\")\n", + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-and-14-deduped\")\n", "\n", "files = []\n", "for file in os.listdir(input_data_dir):\n", - " if file.endswith(\".part\"):\n", - " new_file = file.replace(\".part\", \".jsonl\")\n", + " if file.endswith('.part'):\n", + " new_file = file.replace('.part', '.jsonl')\n", " old_file_path = os.path.join(input_data_dir, file)\n", " new_file_path = os.path.join(input_data_dir, new_file)\n", " os.rename(old_file_path, new_file_path)\n", @@ -3257,7 +3331,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 24, "id": "439add9c-9f51-4481-95cf-456dc5be9fd2", "metadata": { "tags": [] @@ -3274,7 +3348,7 @@ "source": [ "t0 = time.time()\n", "\n", - "# Run MinHash on input data\n", + "# Run MinHash() on input data\n", "minhasher = MinHash(\n", " seed=seed,\n", " num_hashes=minhash_length,\n", @@ -3283,11 +3357,12 @@ " logger=log_dir,\n", " id_field=id_field,\n", " text_field=text_field,\n", + " cache_dir=minshah_output_dir\n", ")\n", "\n", "result = minhasher(DocumentDataset(df)).df\n", "\n", - "print(f\"Computing minhashes took {time.time()-t0} s\")" + "print(f\"Computing minhashes took:{time.time()-t0}\")" ] }, { @@ -3381,28 +3456,26 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "id": "b11c2f37-3b78-4e1b-a9ff-4a89b38f3604", "metadata": { "tags": [] }, "outputs": [], "source": [ - "lsh_input_dir = os.path.join(base_dir, \"rpv2-2023-06-and-14-minhash\")\n", - "output_bucket_dir = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14\")\n", - "initialize_cache_directory(output_bucket_dir)\n", - "\n", - "id_field = \"id\"\n", + "lsh_input_dir = os.path.join(base_dir,\"rpv2-2023-06-and-14-minhash\")\n", + "id_field = 'id'\n", + "output_bucket_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14\"))\n", "num_bands = 20\n", "buckets_per_shuffle = 1\n", - "minhash_field = \"_minhash_signature\"\n", + "minhash_field = '_minhash_signature'\n", "minhash_length = 260\n", "log_dir = os.path.join(base_dir, \"logs\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "id": "a243ed7a-9175-488f-8097-5b82c47c5708", "metadata": { "tags": [] @@ -3419,7 +3492,7 @@ "source": [ "t0 = time.time()\n", "\n", - "# Load MinHash output\n", + "#Load MinHash output\n", "df = dask_cudf.read_parquet(lsh_input_dir, blocksize=\"2GB\", aggregate_files=True)\n", "df = df.map_partitions(\n", " convert_str_id_to_int,\n", @@ -3430,6 +3503,7 @@ ")\n", "\n", "lsh = LSH(\n", + " cache_dir=output_bucket_dir,\n", " num_hashes=minhash_length,\n", " num_buckets=num_bands,\n", " buckets_per_shuffle=buckets_per_shuffle,\n", @@ -3539,28 +3613,28 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "2cc909e1-e02e-433d-b6c2-e7f82a137438", "metadata": { "tags": [] }, "outputs": [], "source": [ - "input_data_paths = [os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped\")]\n", + "input_data_paths = [os.path.join(base_dir,\"rpv2-2023-06-and-14-deduped\")]\n", "num_files = None\n", - "text_ddf_blocksize = 256 # The block size for chunking jsonl files for the ddf in MB\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"\n", - "input_bucket_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/_buckets.parquet\")\n", - "input_bucket_field = \"_bucket_id\"\n", - "shuffle_type = \"tasks\"\n", + "text_ddf_blocksize = 256 #The block size for chunking jsonl files for text ddf in mb\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "input_bucket_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14/_buckets.parquet\")\n", + "input_bucket_field = '_bucket_id'\n", + "shuffle_type ='tasks'\n", "log_dir = os.path.join(base_dir, \"logs\")\n", - "output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/anchor_docs_with_bk.parquet\"))" + "output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14/anchor_docs_with_bk.parquet\"))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "id": "3effff2a-f01d-4f33-b495-97455a280a59", "metadata": { "tags": [] @@ -3576,7 +3650,7 @@ } ], "source": [ - "# Read jsonl input data\n", + "# Read .jsonl input data\n", "ddf_text = get_text_ddf_from_json_path_with_blocksize(\n", " input_data_paths=input_data_paths,\n", " num_files=num_files,\n", @@ -3585,12 +3659,12 @@ " text_column=text_field,\n", ")\n", "\n", - "print(f\"ddf_text.npartitions = {ddf_text.npartitions}\", flush=True)" + "print(f\"ddf_text.npartitions = {ddf_text.npartitions}\", flush=True)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 14, "id": "e4e00d8e-170c-4adf-bf34-1ad1b5275760", "metadata": { "tags": [] @@ -3609,13 +3683,13 @@ "t0 = time.time()\n", "num_workers = get_num_workers(gpu_client)\n", "\n", - "# Read _buckets.parquet\n", + "# Read \"_buckets.parquet\"\n", "ddf_bk = get_bucket_ddf_from_parquet_path(\n", " input_bucket_path=input_bucket_path, \n", " num_workers=num_workers\n", ")\n", "\n", - "# Run _MapBuckets\n", + "#Run _MapBuckets()\n", "map_buckets = _MapBuckets(\n", " id_fields=[\"dataset_id\", \"doc_id\"], \n", " bucket_field=input_bucket_field, \n", @@ -3629,13 +3703,13 @@ " shuffle_type=shuffle_type\n", ")\n", "\n", - "# Write to disk\n", + "#Write to disk\n", "ddf_anchor_docs_with_bk.to_parquet(\n", " output_anchor_docs_with_bk_path, \n", " write_index=False\n", ")\n", "\n", - "print(f\"Mapping buckets took {time.time()-t0} s\")" + "print(f\"Mapping Bucket took {time.time()-t0} s\")" ] }, { @@ -3766,7 +3840,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "8cbce892-ec9c-46e3-9fbe-09f9a243a7aa", "metadata": { "tags": [] @@ -3781,13 +3855,13 @@ "bucket_mapping_ddf_blocksize = 256\n", "parts_per_worker = 16\n", "bucket_parts_per_worker = 256\n", - "id_field = \"id\"\n", - "text_field = \"raw_content\"" + "id_field = 'id'\n", + "text_field = 'raw_content'" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "id": "1acd3e4a-0310-4413-98b2-07cd7c74ee57", "metadata": { "collapsed": true, @@ -4231,7 +4305,7 @@ " partition_on=\"_output_partition_id\",\n", ")\n", "\n", - "print(f\"Jaccard shuffle took {time.time()-t0} s\")" + "print(f\"Jaccard Shuffle took {time.time()-t0} s\")" ] }, { @@ -4244,15 +4318,15 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "id": "d3890884-868e-473e-932d-02a17739b492", "metadata": { "tags": [] }, "outputs": [], "source": [ - "id_field = \"id\"\n", - "text_field = \"raw_content\"\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", "ngram_size = 5\n", "shuffled_docs_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/shuffled_docs.parquet\")\n", "jaccard_results_path = expand_outdir_and_mkdir(\n", @@ -4262,7 +4336,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "id": "117cd113-c8f8-46a3-8173-231faff3b69d", "metadata": { "tags": [] @@ -4278,7 +4352,6 @@ ], "source": [ "t0 = time.time()\n", - "\n", "jaccard = JaccardSimilarity(\n", " id_field=id_field ,\n", " text_field=text_field,\n", @@ -4295,7 +4368,7 @@ " write_metadata_file=False,\n", ")\n", "\n", - "print(f\"Jaccard computing and writing took {time.time() - t0} seconds\")" + "print(f\"Jaccard Computing+Writing took {time.time() - t0} seconds\")" ] }, { @@ -4303,21 +4376,23 @@ "id": "c2307ab3-6d75-4e96-b2f3-dc180aed06ef", "metadata": {}, "source": [ - "### 5.3.6 Connected Components" + "### 5.3.6 Connected Component" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "id": "fd602c6e-39e5-45fe-9031-ea3926d68a68", "metadata": { "tags": [] }, "outputs": [], "source": [ - "cache_dir = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14\")\n", - "initialize_cache_directory(cache_dir)\n", - "id_field = \"id\"\n", + "cache_dir = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/cc-cache\")\n", + ")\n", + "jaccard_pairs_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/jaccard_similarity_results.parquet\")\n", + "id_field = 'id'\n", "jaccard_threshold = 0.8\n", "output_path = expand_outdir_and_mkdir(\n", " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/connected_components.parquet\")\n", @@ -4360,15 +4435,14 @@ ], "source": [ "t0 = time.time()\n", - "\n", "components_stage = ConnectedComponents(\n", + " cache_dir=cache_dir,\n", + " jaccard_pairs_path=jaccard_pairs_path,\n", " id_column=id_field,\n", " jaccard_threshold=jaccard_threshold,\n", - " false_positive_check=True,\n", ")\n", "components_stage.cc_workflow(output_path=output_path)\n", - "\n", - "print(f\"Connected components took {time.time()-t0} seconds\")" + "print(f\"Connected Component took {time.time()-t0} seconds\")" ] }, { @@ -4384,12 +4458,12 @@ "id": "5aa8c531-10ec-448d-8413-f059b796226d", "metadata": {}, "source": [ - "From the outputs of the connect components step, we can see that inter-snapshot deduplication found 81,764,804 duplicates." + "From the outputs of the Connect Component step, we can see that inter-snapshot dedup found 81,764,804 duplicates." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "d2bd2d15-5c09-4fb1-a289-ce228f90713e", "metadata": { "tags": [] @@ -4407,14 +4481,14 @@ "output_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/connected_components.parquet\")\n", "cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)\n", "\n", - "# Set \"group\" as the index and shuffle to ensure all same \"group\" values are in the same partition\n", - "cc_result = cc_result.set_index(\"group\", shuffle=\"tasks\")\n", + "# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition\n", + "cc_result = cc_result.set_index('group', shuffle='tasks')\n", "\n", "# Define a function to assign cumulative counts and filter duplicates\n", "def assign_cumcount(df):\n", - " df[\"cumcount\"] = df.groupby(level=0).cumcount()\n", - " df = df[df[\"cumcount\"] >= 1]\n", - " df = df.drop(columns=[\"cumcount\"])\n", + " df['cumcount'] = df.groupby(level=0).cumcount()\n", + " df = df[df['cumcount'] >= 1]\n", + " df = df.drop(columns=['cumcount'])\n", " return df\n", "\n", "# Find duplicates by applying the function to each partition\n", @@ -4424,9 +4498,7 @@ "docs_to_remove = docs_to_remove.reset_index()\n", "\n", "docs_to_remove = docs_to_remove[[\"dataset_id\", \"doc_id\"]]\n", - "docs_to_remove = docs_to_remove.rename(\n", - " columns={\"dataset_id\": \"to_remove_dataset_id\", \"doc_id\": \"to_remove_doc_id\"}\n", - ")\n", + "docs_to_remove = docs_to_remove.rename(columns={\"dataset_id\":\"to_remove_dataset_id\", \"doc_id\":\"to_remove_doc_id\"})\n", "docs_to_remove = docs_to_remove.reset_index(drop=True).persist()\n", "_ = wait(docs_to_remove)\n", "del _ \n", @@ -4444,7 +4516,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "id": "a6c6d7d6-9db5-4504-9d03-67c7ed69c409", "metadata": { "tags": [] @@ -4469,12 +4541,12 @@ " start_index=0,\n", " file_prefix=\"rpv2-2023-06-and-14-deduped\",\n", ")\n", - "print(f\"Data sharding took {time.time()-t0} s\")" + "print(f\"Data sharding took:{time.time()-t0}\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "id": "63546dd3-1560-4b38-91ce-84c004636657", "metadata": { "tags": [] @@ -4492,10 +4564,10 @@ "from helper import convert_str_id_to_int\n", "\n", "input_dataset = DocumentDataset.read_json(os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped-resharded\"), backend=\"cudf\")\n", - "input_df = input_dataset.df[[\"raw_content\", \"id\"]]\n", + "input_df = input_dataset.df[['raw_content','id']]\n", "meta = input_df._meta\n", - "meta[\"doc_id\"]=np.int64([0])\n", - "meta[\"dataset_id\"]=np.uint32([0])\n", + "meta['doc_id']=np.int64([0])\n", + "meta['dataset_id']=np.uint32([0])\n", "input_df = input_df.map_partitions(\n", " convert_str_id_to_int,\n", " id_column=\"id\",\n", @@ -4505,7 +4577,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "id": "88dfe6a9-a463-4d43-9f79-0d3df960961c", "metadata": { "tags": [] @@ -4522,15 +4594,15 @@ "source": [ "dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"/rpv2-2023-06-and-14-inter-deduped\"))\n", "deduped_df = input_df.merge(docs_to_remove,\n", - " left_on=[\"doc_id\", \"dataset_id\"],\n", + " left_on=['doc_id','dataset_id'],\n", " right_on=[\"to_remove_doc_id\", \"to_remove_dataset_id\"],\n", - " how=\"left\")\n", + " how='left')\n", "\n", - "deduped_df = deduped_df[deduped_df[\"to_remove_doc_id\"].isna()].drop(columns=[\"to_remove_doc_id\", \"to_remove_dataset_id\"]).reset_index(drop=True)\n", + "deduped_df = deduped_df[deduped_df['to_remove_doc_id'].isna()].drop(columns=['to_remove_doc_id', \"to_remove_dataset_id\"]).reset_index(drop=True)\n", "\n", "t0 = time.time()\n", "deduped_df.to_parquet(dedup_output_dir)\n", - "print(f\"Removing duplicates and writing deduplicated dataset took {time.time()-t0} seconds\")" + "print(f\"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds\")" ] }, { @@ -4538,7 +4610,7 @@ "id": "473fea1e-45e5-423c-9187-17867c0ad2a7", "metadata": {}, "source": [ - "We can verify that the deduplicated dataset has 1,585,546,179 documents, compared to 1,667,310,983 documents before deduplication." + "We can verify that the deduped dataset has 1,585,546,179 documents, compared to 1,667,310,983 documents befoe dedup." ] }, { @@ -4601,16 +4673,16 @@ "id": "e19b91e6-3156-4a1b-816b-500a54def99d", "metadata": {}, "source": [ - "Web crawled datasets often have low quality documents that we do not want the model to learn from; this, we can perform quality filtering to remove low quality data. NeMo Curator offers modules for both classifier-based and heuristic-based filtering. In this tutorial, we will perform heuristic-based filtering using a list of heuristic filters to improve data quality.\n", + "Web crawled dataset often has low quality documents that we do not want the model to learn from. We can perform quality filtering to remove low quality data. NeMo Curator offers modules for both classifier-based and heuristic-based filtering. In this tutorial, we will perform heuristic filtering using a list of heuristic filters to improve data quality.\n", "\n", - "NeMo Curator provides a generic list of heuristic filters, but for this tutorial, we only select 10 filters for demonstrative purposes. The selected filters are given in `config/heuristic_filter_en.yaml`.\n", + "Curator provides a generic list of heuristic filters but for this tutorial, we only select 10 filters for demo purposes. The selected filters are given in `config/heuristic_filter_en.yaml`.\n", "\n", - "Heuristic filtering in NeMo Curator is a CPU-based module, so we will need to use the CPU cluter." + "Heuristic filtering in Curator is a cpu module so we will need to use the cpu cluter." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "a1347d09-8bc5-453b-aa4a-4b75c4b3b47a", "metadata": { "tags": [] @@ -4625,7 +4697,7 @@ } ], "source": [ - "scheduler_address = os.getenv(\"SCHEDULER_ADDRESS\")\n", + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", "cpu_client = get_client(scheduler_address=scheduler_address)\n", "print(f\"Num Workers = {get_num_workers(cpu_client)}\", flush=True)" ] @@ -4643,12 +4715,12 @@ "\n", "filter_config_file = os.path.join(base_dir, \"config/heuristic_filter_en.yaml\")\n", "hf_input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-and-14-inter-deduped\")\n", - "kept_document_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-and-14-heuristic-filtering\", \"hf.parquet\"))" + "kept_document_dir = expand_outdir_and_mkdir(os.path.join(base_dir,'rpv2-2023-06-and-14-heuristic-filtering','hf.parquet'))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "e5284b6f-c87e-4027-a802-ab08b92610cd", "metadata": { "tags": [] @@ -4670,14 +4742,14 @@ "# Load dataset\n", "dataset = DocumentDataset.read_parquet(hf_input_data_dir)\n", "\n", - "# Construct pipeline from config file\n", + "# construct pipeline from config\n", "filter_pipeline = build_filter_pipeline(filter_config_file)\n", "\n", - "# Filter data and write to disk\n", + "# filter data and write to disk\n", "filtered_dataset = filter_pipeline(dataset)\n", "filtered_dataset.to_parquet(kept_document_dir)\n", "\n", - "print(f\"Time taken for heuristic filtering: {time.time()-t0} s\")" + "print(f\"Time taken for Heuristic filtering: {time.time()-t0} s\")" ] }, { @@ -4685,7 +4757,7 @@ "id": "3ce0658a-97f9-412c-8eef-68f6ab0b4be6", "metadata": {}, "source": [ - "After filitering, we have 1,229,679,047 documents left, removing 355,867,132 documents from the deduplicated dataset." + "After filitering, we have 1,229,679,047 documents left, removing 355,867,132 documents from the deduped dataset." ] }, { @@ -4761,12 +4833,12 @@ ] }, { - "cell_type": "markdown", - "id": "7e0687d4", + "cell_type": "code", + "execution_count": null, + "id": "aa60a9b9-9158-43d6-9ea4-1f544c6f816e", "metadata": {}, - "source": [ - "In this tutorial, we were able to demonstrate how to use NeMo Curator for preprocessing rpv2 data." - ] + "outputs": [], + "source": [] } ], "metadata": { diff --git a/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb b/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb index a725aa1c2..3389fc6ee 100644 --- a/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb +++ b/tutorials/pretraining-vietnamese-data-curation/pretraining-vietnamese-data-curation.ipynb @@ -804,17 +804,6 @@ "Initialize and run deduplication:" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from nemo_curator.cache import initialize_cache_directory\n", - "\n", - "initialize_cache_directory(exact_dedup_output_dir)" - ] - }, { "cell_type": "code", "execution_count": null, @@ -843,6 +832,7 @@ " id_field=exact_dedup_dataset_id_field,\n", " text_field=exact_dedup_dataset_text_field,\n", " hash_method=\"md5\",\n", + " cache_dir=exact_dedup_output_dir,\n", ")\n", "duplicates = exact_dup(dataset=input_dataset)\n", "\n", diff --git a/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb b/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb index deb945081..82935cde8 100644 --- a/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb +++ b/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb @@ -768,14 +768,15 @@ "id": "1baf027e", "metadata": {}, "source": [ - "## 4.Exact Dedplication\n", + "## 4. Exact Deduplication\n", "\n", - "In exact deduplication, the document text is hashed into unique string using certain hashing algorithm, such as 'md5'. The documents with exact hashed values are having identical text. We will output the `ID` of duplicated documents for removal later. The function used is `ExactDuplicates()`. Arguments for this function include:\n", - "- `id_field`: Key in input file for identifying document ID\n", - "- `text_field`: Key in input file which contains document text.\n", - "- `hash_method`: Hashing algorithm used. Default is `md5`\n", + "In exact deduplication, the document text is hashed into a unique string by using a hashing algorithm such as md5. The documents with exact hashed values are identified as having identical text. We will output the ID of duplicated documents for removal later. The class used for exact deduplication in NeMo Curator is called `ExactDuplicates`. Fields for this class include:\n", + "- `id_field`: Column in input file which contains a unique ID.\n", + "- `text_field`: Column in input file which contains document text.\n", + "- `hash_method`: Hashing algorithm used. Default is \"md5\".\n", + "- `cache_dir`: If specified via `ExactDuplicates(cache_dir=...)` or `Cache(cache_dir=...)`, the duplicated document IDs will be output to the cache directory. Otherwise, the IDs will not be saved.\n", "\n", - "Also, we are going to use GPU dask cluster to accelerate computation for deduplication (both exact and fuzzy)\n" + "We are going to use a GPU-based Dask cluster to accelerate computation for deduplication (both exact and fuzzy deduplication).\n" ] }, { @@ -878,19 +879,6 @@ "!mkdir -p {exact_dedup_output_dir}" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "65feffbc", - "metadata": {}, - "outputs": [], - "source": [ - "from nemo_curator.cache import initialize_cache_directory\n", - "\n", - "# Duplicated document ID list is output to the cache_dir\n", - "initialize_cache_directory(exact_dedup_output_dir)" - ] - }, { "cell_type": "markdown", "id": "1882204a", @@ -912,18 +900,19 @@ "# Read input dataset\n", "input_dataset = DocumentDataset.read_json(exact_dedup_input_dataset_dir, backend='cudf')\n", "\n", - "#Run exact deduplication to the input\n", + "# Run exact deduplication to the input\n", "exact_dup = ExactDuplicates(\n", " logger=exact_dedup_log_dir,\n", " id_field=exact_dedup_dataset_id_field,\n", " text_field=exact_dedup_dataset_text_field,\n", " hash_method=\"md5\",\n", + " cache_dir=exact_dedup_output_dir, # Duplicated document ID list is output to the cache_dir\n", ")\n", "duplicates = exact_dup(dataset=input_dataset)\n", "\n", - "print(f\"Number of exact duplicated file:{len(duplicates)}\")\n", + "print(f\"Number of exact duplicated files: {len(duplicates)}\")\n", "\n", - "print(f\"Time taken for exact duplicate:{time.time()-t0}\")" + "print(f\"Time taken for exact deduplication: {time.time()-t0}s\")" ] }, { @@ -1060,21 +1049,23 @@ "id": "5df73743", "metadata": {}, "source": [ - "### 5.1 Minhash\n", + "### 5.1 MinHash\n", + "\n", + "Run `MinHash` for this section. The output of a minhash is a Parquet file which contains a document ID and hashed value, which is an array which contains 260 32-bit integer data. To obtain such hashed values we need to go through the following steps:\n", "\n", - "Run `MinHash()` for this section. The output of a minhash is a parquet file which contains document ID and hashed value which is an array contains 260 32-bit integer data. To obtain such hashed values we need to go through the following steps:\n", "1. Generate a set of n-gram components of a document. For example, doc = `Nemo Curator is a data curation tool`, a 3-gram set of this document will be `['Nemo Curator is','Curator is a','is a data','a data curation','data curation tool']`\n", - "2. Hashed each n-gram into numerical values\n", + "2. Hash each n-gram into numerical values\n", "3. Generate a random hash function $H_1()$ which will hash each numeric n-gram into a 32-bit integer and take the minimum integer to use as minhash value for $H_1()$\n", - "4. Repeat step 2 and 3 with hash function $H_x()$ until desired minhash length is reached. Minhash value of each iteration will be append together to form the final minhash array. \n", + "4. Repeat step 2 and 3 with hash function $H_x()$ until the desired minhash length is reached. The minhash value of each iteration will be append together to form the final minhash array. \n", "\n", "Arguments include:\n", - "- `seed`:Random seed used for initializing the hash functions used to compute the MinHashes. It's advised to keep this value the same for different experiment for reproducibility\n", - "- `num_hashes`:Length of each minhash array. Default is 260. Longer minhash length will have better estimate of actual Jaccard similarity, but require more computational power\n", - "- `char_ngrams`:n-gram length\n", - "- `use_64bit_hash`:Whether to use 64bit or 32bit hash function\n", - "- `id_field`: Key in input file for identifying document ID\n", - "- `text_field`: Key in input file which contains document text." + "- `seed`: Random seed used for initializing the hash functions used to compute the minhashes. It is advised to keep this value the same for different experiments for reproducibility.\n", + "- `num_hashes`: Length of each minhash array. Default is 260. A longer minhash length will have better estimate of actual Jaccard similarity, but require more computational power.\n", + "- `char_ngrams`: n-gram length.\n", + "- `use_64bit_hash`: Whether to use 64-bit or 32-bit hash function.\n", + "- `id_field`: Column in input file which contains a unique ID.\n", + "- `text_field`: Column in input file which contains document text.\n", + "- `cache_dir`: If specified via `MinHash(cache_dir=...)` or `Cache(cache_dir=...)`, the intermediate result will be output to the cache directory." ] }, { @@ -1086,8 +1077,7 @@ }, "outputs": [], "source": [ - "from nemo_curator import MinHash\n", - "from nemo_curator.cache import initialize_cache_directory" + "from nemo_curator import MinHash" ] }, { @@ -1107,19 +1097,20 @@ }, "outputs": [], "source": [ - "#Input\n", + "# Input\n", "minhash_data_path = added_id_output_path\n", - "#Output\n", - "minhash_base_output_path = os.path.join(data_dir,\"fuzzy/minhash\")\n", - "minhash_log_dir = os.path.join(minhash_base_output_path,'log')\n", - "minhash_output_dir = os.path.join(minhash_base_output_path,'data')\n", - "initialize_cache_directory(minhash_output_dir)\n", - "#Specify dataset name\n", - "dataset_name = 'TH_wikipedia'\n", "\n", - "#Relevant parameters\n", - "minhash_id_field = 'id'\n", - "minhash_text_field = 'text'\n", + "# Output\n", + "minhash_base_output_path = os.path.join(data_dir, \"fuzzy/minhash\")\n", + "minhash_log_dir = os.path.join(minhash_base_output_path, \"log\")\n", + "minhash_output_dir = os.path.join(minhash_base_output_path, \"data\")\n", + "\n", + "# Specify dataset name\n", + "dataset_name = \"TH_wikipedia\"\n", + "\n", + "# Relevant parameters\n", + "minhash_id_field = \"id\"\n", + "minhash_text_field = \"text\"\n", "seed = 10\n", "minhash_length = 260\n", "char_ngram = 5\n", @@ -1150,7 +1141,8 @@ "t0 = time.time()\n", "print(f\"Computing minhashes for {minhash_data_path}\")\n", "\n", - "# Load data. Only the [minhash_id_field, text_field] columns are needed\n", + "# Load data\n", + "# Only the [minhash_id_field, text_field] columns are needed\n", "files = get_all_files_paths_under(root=minhash_data_path, recurse_subdirectories=False)\n", "files = [f for f in files if f.endswith(\".jsonl\")]\n", "df = read_data(\n", @@ -1161,7 +1153,7 @@ " add_filename=False,\n", ")[[minhash_id_field, minhash_text_field]]\n", "\n", - "# Run MinHash() on input data\n", + "# Run MinHash on input data\n", "minhasher = MinHash(\n", " seed=seed,\n", " num_hashes=minhash_length,\n", @@ -1170,10 +1162,11 @@ " logger=minhash_log_dir,\n", " id_field=minhash_id_field,\n", " text_field=minhash_text_field,\n", + " cache_dir=minhash_output_dir,\n", ")\n", "res = minhasher(DocumentDataset(df)).df\n", "\n", - "print(f\"Time taken for MinHash:{time.time()-t0}\")" + "print(f\"Time taken for MinHash: {time.time()-t0}\")" ] }, { @@ -1203,17 +1196,18 @@ "metadata": {}, "source": [ "### 5.2 LSH\n", - "`LSH()` implements LSH algorithm which includes the following steps:\n", + "`LSH` implements the Locality Sensitive Hashing algorithm which includes the following steps:\n", "1. Divide the minhash array into `X` different portions. \n", "2. For each portions, hash the minhash values into buckets. One document will be assigned to `X` buckets.\n", - "3. Documents within the same bucket will be deemed similar. Since every document will be assigned `X` buckets and as long as two documents share 1 or more buckets they are deemed similar, the result of LSH will have more false positive as compared to false negative. The false positive cases will be filtered in following modules, namely jaccard compute.\n", + "3. Documents within the same bucket will be deemed similar. Since every document will be assigned `X` buckets and as long as two documents share 1 or more buckets they are deemed similar, the result of LSH will have more false positive values as compared to false negative values. The false positive cases will be filtered in following modules, namely Jaccard compute.\n", "\n", "Arguments include:\n", - "- `minhash_length`:Length of minhash signature. Must be consistent with `MinHash()`\n", - "- `num_buckets`: Number of buckets\n", - "- `buckets_per_shuffle`: Number of buckets to shuffle concurrently\n", - "- `id_field`: Key in input file for identifying document ID\n", - "- `minhash_field`: Key in input file for identifying document MinHash signature " + "- `minhash_length`: Length of minhash signature. Must be consistent with `MinHash`.\n", + "- `num_buckets`: Number of buckets.\n", + "- `buckets_per_shuffle`: Number of buckets to shuffle concurrently.\n", + "- `id_field`: Column in input file which contains a unique ID.\n", + "- `minhash_field`: Column in input file for identifying a document's MinHash signature.\n", + "- `cache_dir`: If specified via `LSH(cache_dir=...)` or `Cache(cache_dir=...)`, the intermediate result will be output to the cache directory.\"" ] }, { @@ -1247,21 +1241,20 @@ }, "outputs": [], "source": [ - "#Input\n", + "# Input\n", "lsh_input_data_path = minhash_output_dir\n", "\n", - "#Output\n", - "lsh_base_output_path = os.path.join(data_dir,\"fuzzy/lsh\")\n", - "lsh_log_dir = os.path.join(lsh_base_output_path,'log')\n", - "lsh_output_dir = os.path.join(lsh_base_output_path,'data')\n", - "initialize_cache_directory(lsh_output_dir)\n", + "# Output\n", + "lsh_base_output_path = os.path.join(data_dir, \"fuzzy/lsh\")\n", + "lsh_log_dir = os.path.join(lsh_base_output_path, \"log\")\n", + "lsh_output_dir = os.path.join(lsh_base_output_path, \"data\")\n", "\n", - "#Relevant parameters\n", - "lsh_id_field = 'id'\n", - "minhash_field = '_minhash_signature'\n", - "minhash_length=260\n", - "num_bands=20\n", - "buckets_per_shuffle=1\n", + "# Relevant parameters\n", + "lsh_id_field = \"id\"\n", + "minhash_field = \"_minhash_signature\"\n", + "minhash_length = 260\n", + "num_bands = 20\n", + "buckets_per_shuffle = 1\n", "\n", "!mkdir -p {lsh_log_dir}\n", "!mkdir -p {lsh_output_dir}" @@ -1286,7 +1279,7 @@ "source": [ "t0 = time.time()\n", "\n", - "#Load MinHash output\n", + "# Load MinHash output\n", "df = dask_cudf.read_parquet(lsh_input_data_path, blocksize=\"2GB\", aggregate_files=True, backend = \"cudf\")\n", "df = df.map_partitions(\n", " convert_str_id_to_int,\n", @@ -1296,8 +1289,9 @@ " ),\n", ")\n", "\n", - "#Run LSH()\n", + "# Run LSH\n", "lsh = LSH(\n", + " cache_dir=lsh_output_dir,\n", " num_hashes=minhash_length,\n", " num_buckets=num_bands,\n", " buckets_per_shuffle=buckets_per_shuffle,\n", @@ -1308,7 +1302,7 @@ "res = lsh(DocumentDataset(df))\n", "\n", "t1 = time.time()\n", - "print(f\"Time taken for LSH:{time.time()-t0}\")" + "print(f\"Time taken for LSH: {time.time()-t0}s\")" ] }, { @@ -1680,11 +1674,13 @@ "metadata": {}, "source": [ "### 5.5 Connected Components\n", - "This section uses `ConnectedComponents()`.This section takes a dataset consisting of document pairs and their corresponding jaccard similarity to construct a non-directed graph. A edge will be form between documents whose Jaccard similarity is higher than the threshold (0.8 in this example). It will then identify the connected components in this graph. Documents within the same connected components are deemed duplicated\n", + "This section uses the `ConnectedComponents` class. This section takes a dataset consisting of document pairs and their corresponding Jaccard similarity scores to construct a non-directed graph. A edge will be formed between documents whose Jaccard similarity is higher than a given threshold (0.8 in this example). It will then identify the connected components in this graph. Documents within the same connected components are deemed duplicates.\n", "\n", "Arguments include:\n", - "- `id_column`: Prefix of ID column in `jaccard_similarity_results.parquet`\n", - "- `jaccard_threshold`: Threshold to determine if an edge exists between two documents" + "- `cache_dir`: If specified via `ConnectedComponents(cache_dir=...)` or `Cache(cache_dir=...)`, the intermediate results will be output to the cache directory.\n", + "- `jaccard_pairs_path`: Input path for `jaccard_similarity_results.parquet`.\n", + "- `id_column`: Prefix of ID column in `jaccard_similarity_results.parquet`.\n", + "- `jaccard_threshold`: Threshold to determine if an edge exists between two documents." ] }, { @@ -1716,18 +1712,16 @@ }, "outputs": [], "source": [ - "from nemo_curator.cache import initialize_cache_directory\n", - "\n", - "#Input\n", + "# Input\n", "jaccard_pairs_path = jaccard_compute_output_results_path\n", "\n", - "#Output\n", + "# Output\n", "connected_component_base_output_path = os.path.join(data_dir, \"fuzzy/cc\")\n", "connected_component_output_path = os.path.join(connected_component_base_output_path, \"connected_components.parquet\")\n", - "initialize_cache_directory(os.path.join(data_dir, \"fuzzy/jaccard_compute\"))\n", + "connected_component_cache_dir = os.path.join(connected_component_base_output_path, \"cache\")\n", "\n", - "#Relevant parameters\n", - "input_id_field = 'id'\n", + "# Relevant parameters\n", + "input_id_field = \"id\"\n", "jaccard_threshold = 0.8\n", "\n", "!mkdir -p {connected_component_base_output_path}" @@ -1753,14 +1747,15 @@ "t0 = time.time()\n", " \n", "components_stage = ConnectedComponents(\n", + " cache_dir=connected_component_cache_dir,\n", + " jaccard_pairs_path=jaccard_pairs_path,\n", " id_column=input_id_field,\n", " jaccard_threshold=jaccard_threshold,\n", - " false_positive_check=True,\n", ")\n", "\n", - "#Load and run connected component\n", + "# Load and run connected components\n", "components_stage.cc_workflow(output_path=connected_component_output_path)\n", - "print(f\"Time taken for Connected Component: {time.time()-t0} s\")" + "print(f\"Time taken for Connected Components: {time.time()-t0}s\")" ] }, { @@ -1950,24 +1945,24 @@ }, "outputs": [], "source": [ - "#Input\n", + "# Input\n", "fuzzy_dedup_data_path = added_id_output_path\n", - "#Output\n", - "fuzzy_dedup_base_output_path = os.path.join(data_dir,\"fuzzy_wrapper\")\n", - "fuzzy_dedup_log_dir = os.path.join(fuzzy_dedup_base_output_path,'log')\n", - "fuzzy_dedup_cache_dir = os.path.join(fuzzy_dedup_base_output_path,'cache')\n", - "initialize_cache_directory(fuzzy_dedup_cache_dir)\n", - "fuzzy_dedup_output_dir = os.path.join(fuzzy_dedup_base_output_path,'data')\n", - "#Specify dataset name\n", - "dataset_name = 'TH_wikipedia'\n", - "\n", - "#Relevant parameters\n", - "id_field = 'id'\n", - "text_field = 'text'\n", + "# Output\n", + "fuzzy_dedup_base_output_path = os.path.join(data_dir, \"fuzzy_wrapper\")\n", + "fuzzy_dedup_log_dir = os.path.join(fuzzy_dedup_base_output_path, \"log\")\n", + "fuzzy_dedup_cache_dir = os.path.join(fuzzy_dedup_base_output_path, \"cache\")\n", + "fuzzy_dedup_output_dir = os.path.join(fuzzy_dedup_base_output_path, \"data\")\n", + "# Specify dataset name\n", + "dataset_name = \"TH_wikipedia\"\n", + "\n", + "# Relevant parameters\n", + "id_field = \"id\"\n", + "text_field = \"text\"\n", "filetype = \"parquet\"\n", "\n", "!mkdir -p {fuzzy_dedup_base_output_path}\n", "!mkdir -p {fuzzy_dedup_log_dir}\n", + "!mkdir -p {fuzzy_dedup_cache_dir}\n", "!mkdir -p {fuzzy_dedup_output_dir}" ] }, @@ -2000,16 +1995,16 @@ }, "outputs": [], "source": [ - "with dask.config.set({\"dataframe.backend\": 'cudf'}):\n", - " \n", + "with dask.config.set({\"dataframe.backend\": \"cudf\"}):\n", " t0 = time.time()\n", - " \n", - " input_dataset = DocumentDataset.read_json(fuzzy_dedup_data_path, backend='cudf')\n", + "\n", + " input_dataset = DocumentDataset.read_json(fuzzy_dedup_data_path, backend=\"cudf\")\n", "\n", " fuzzy_dedup_config = FuzzyDuplicatesConfig(\n", + " cache_dir=fuzzy_dedup_cache_dir,\n", " id_field=id_field,\n", " text_field=text_field,\n", - " seed=seed, #Use the seed set in Minhash section for consistency\n", + " seed=seed, # Use the seed set in MinHash section for consistency\n", " char_ngrams=5,\n", " num_buckets=20,\n", " hashes_per_bucket=13,\n", @@ -2024,7 +2019,7 @@ " \n", " duplicates.to_parquet(fuzzy_dedup_output_dir, write_to_filename=False)\n", " \n", - " print(f\"Time taken for Connected Component: {time.time()-t0} s\")\n" + " print(f\"Time taken for Connected Components: {time.time()-t0}s\")" ] }, { diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py index 36069753f..fcbbf9dde 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py @@ -5,7 +5,6 @@ import dask_cudf from nemo_curator import MinHash -from nemo_curator.cache import initialize_cache_directory from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, get_num_workers from nemo_curator.utils.file_utils import get_all_files_paths_under @@ -35,7 +34,6 @@ def read_folder(input_folder, columns=["nemo_id", "text"]): minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash") minhash_output_dir = os.path.join(minhash_base_output_path, "data") - initialize_cache_directory(minhash_output_dir) # Relevant parameters minhash_id_field = "nemo_id" @@ -60,6 +58,7 @@ def read_folder(input_folder, columns=["nemo_id", "text"]): use_64bit_hash=use_64bit_hash, id_field=minhash_id_field, text_field=minhash_text_field, + cache_dir=minhash_output_dir, ) res = minhasher(DocumentDataset(text_ddf)).df logging.info(f"Time taken for MinHash: {time.time()-t0:.2f}sec.") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py index 6e690a4fa..0df90c207 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py @@ -7,7 +7,6 @@ import numpy as np from nemo_curator import LSH -from nemo_curator.cache import initialize_cache_directory from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, get_num_workers from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int @@ -32,7 +31,6 @@ # Output lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh") lsh_output_dir = os.path.join(lsh_base_output_path, "data") - initialize_cache_directory(lsh_output_dir) # Relevant parameters lsh_id_field = "nemo_id" @@ -55,6 +53,7 @@ ) # Run LSH() lsh = LSH( + cache_dir=lsh_output_dir, num_hashes=minhash_length, num_buckets=num_bands, buckets_per_shuffle=buckets_per_shuffle, diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py index 46d650bf9..21783256b 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py @@ -5,7 +5,6 @@ import dask_cudf from nemo_curator import BucketsToEdges -from nemo_curator.cache import initialize_cache_directory from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, get_num_workers @@ -28,7 +27,6 @@ # Output buckets_to_edges_out = os.path.join(DATA_BASE, "fuzzy/buckets_to_edges/data") - initialize_cache_directory(buckets_to_edges_out) t0 = time.time() @@ -37,7 +35,10 @@ split_row_groups=False, ) - buckets_to_edges = BucketsToEdges(id_fields=["dataset_id", "doc_id"]) + buckets_to_edges = BucketsToEdges( + cache_dir=buckets_to_edges_out, + id_fields=["dataset_id", "doc_id"], + ) ddf_b2e = buckets_to_edges(DocumentDataset(ddf_bk)) logging.info(f"Time taken for Buckets to Edges: {time.time() - t0} s") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py index f8c8a0779..e6ad2165b 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py @@ -3,7 +3,6 @@ import time from nemo_curator import ConnectedComponents -from nemo_curator.cache import initialize_cache_directory from nemo_curator.utils.distributed_utils import get_client, get_num_workers logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) @@ -17,14 +16,18 @@ client = get_client(scheduler_file=SCHEDULER_FILE) logging.info(f"Number of dask workers: {get_num_workers(client)}") # Input - buckets_to_edges_out = os.path.join(DATA_BASE, "fuzzy/buckets_to_edges/data") - initialize_cache_directory(buckets_to_edges_out) + buckets_to_edges_out = os.path.join( + DATA_BASE, "fuzzy/buckets_to_edges/data/_edges.parquet" + ) # Output connected_component_base_output_path = os.path.join(DATA_BASE, "fuzzy/cc") connected_component_output_path = os.path.join( connected_component_base_output_path, "connected_components.parquet" ) + connected_component_cache_dir = os.path.join( + connected_component_base_output_path, "cache" + ) # Relevant parameters input_id_field = "id" @@ -32,8 +35,9 @@ t0 = time.time() components_stage = ConnectedComponents( + cache_dir=connected_component_cache_dir, + jaccard_pairs_path=buckets_to_edges_out, id_column=input_id_field, - false_positive_check=False, ) # Load and run connected components