From eff6f78c759d3afdd0ec8191a8aa1a7b2fc1b679 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 24 May 2024 14:41:17 -0700 Subject: [PATCH 1/5] Add add_id stage to curator Signed-off-by: Ryan Wolf --- .../common_crawl/add_id/add_id.yaml | 14 ++++++ .../common_crawl/curate_common_crawl.yaml | 2 + .../core/data_curation_stages.py | 43 +++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml diff --git a/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml b/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml new file mode 100644 index 0000000000..2401a62cdb --- /dev/null +++ b/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml @@ -0,0 +1,14 @@ +run: + name: 'add-id' + results_dir: ${data_curation.run.results_dir}/${.name} + dependency: "singleton" + time_limit: "01:00:00" + nodes: 1 + node_type: cpu + +output_data_dir: ${.run.results_dir}/id_added +id_field_name: adlr_id +id_prefix: doc_id +shuffle: False +input_file_type: jsonl +output_file_type: jsonl \ No newline at end of file diff --git a/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml b/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml index 0baa673b91..29bc048dd0 100644 --- a/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml +++ b/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml @@ -30,6 +30,7 @@ filter_quality: - quality_filtering fuzzy_deduplication: + - add_id - compute_minhashes - minhash_buckets - jaccard_map_buckets @@ -58,6 +59,7 @@ defaults: - common_crawl/connected_component/connected_component - common_crawl/write_deduped_result_with_text/write_deduped_result_with_text - common_crawl/verify_all_pairs_jaccard/verify_all_pairs_jaccard + - common_crawl/add_id/add_id special: choose_language: diff --git a/launcher_scripts/nemo_launcher/core/data_curation_stages.py b/launcher_scripts/nemo_launcher/core/data_curation_stages.py index f3524e222b..15fbd57aaf 100644 --- a/launcher_scripts/nemo_launcher/core/data_curation_stages.py +++ b/launcher_scripts/nemo_launcher/core/data_curation_stages.py @@ -770,6 +770,7 @@ def __init__(self, cfg): "connected_component": ConnectedComponent, "write_deduped_result_with_text": WriteDedupedResultWithText, "verify_all_pairs_jaccard": VerifyAllPairsJaccard, + "add_id": AddId, } def setup_stage_vars(self, cfg): @@ -1159,3 +1160,45 @@ def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: command_groups = clean_command_groups(command_groups) return command_groups + + +class AddId(DataCurationSubStage): + def __init__(self, cfg, memory): + super().__init__(cfg, memory) + + def setup_stage_vars(self, cfg): + """Setup the stage vars, i.e. stage name and stage cfg""" + self.stage_name = "add_id" + self.stage_cfg = self._get_sub_stage_confg(self.stage_name) + + def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: + """ Builds the command groups for the current stage """ + stage_cfg = self.stage_cfg + + command_groups = [[]] + + # Create the list of arguments for the filter_documents command + args = create_args_list( + replace_underscore=True, + output_data_dir=stage_cfg.get("output_data_dir"), + id_field_name=stage_cfg.get("id_field_name"), + id_prefix=stage_cfg.get("id_prefix"), + shuffle=stage_cfg.get("shuffle"), + input_file_type=stage_cfg.get("input_file_type"), + output_file_type=stage_cfg.get("output_file_type"), + scheduler_file=self.log_folder / "scheduler.json", + ) + + runscript = " \\\n ".join(["verify_all_pairs_jaccard", *args]) + runscript_path = os.path.join(self.log_folder, "verify_all_pairs_jaccard.sh") + + with open(runscript_path, "w") as f: + f.write(runscript) + + core_command = [self.make_dask_command_string(runscript_path)] + + core_command_string = " \\\n ".join(core_command) + command_groups[-1] += [core_command_string] + command_groups = clean_command_groups(command_groups) + + return command_groups \ No newline at end of file From 576c3ce3d54e5480986e70742e5fcefea7e3139d Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 24 May 2024 15:15:59 -0700 Subject: [PATCH 2/5] Fix add_id script command Signed-off-by: Ryan Wolf --- launcher_scripts/nemo_launcher/core/data_curation_stages.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/launcher_scripts/nemo_launcher/core/data_curation_stages.py b/launcher_scripts/nemo_launcher/core/data_curation_stages.py index 15fbd57aaf..e12d3c0c6f 100644 --- a/launcher_scripts/nemo_launcher/core/data_curation_stages.py +++ b/launcher_scripts/nemo_launcher/core/data_curation_stages.py @@ -1189,8 +1189,8 @@ def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: scheduler_file=self.log_folder / "scheduler.json", ) - runscript = " \\\n ".join(["verify_all_pairs_jaccard", *args]) - runscript_path = os.path.join(self.log_folder, "verify_all_pairs_jaccard.sh") + runscript = " \\\n ".join(["add_id", *args]) + runscript_path = os.path.join(self.log_folder, "add_id.sh") with open(runscript_path, "w") as f: f.write(runscript) From 441e2b8c648b023a2df8cd7976f9c56fd7fa59f8 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 24 May 2024 16:00:33 -0700 Subject: [PATCH 3/5] Add memory to add_id Signed-off-by: Ryan Wolf --- .../conf/data_curation/common_crawl/add_id/add_id.yaml | 1 - launcher_scripts/nemo_launcher/core/data_curation_stages.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml b/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml index 2401a62cdb..0f6aae0d76 100644 --- a/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml +++ b/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml @@ -9,6 +9,5 @@ run: output_data_dir: ${.run.results_dir}/id_added id_field_name: adlr_id id_prefix: doc_id -shuffle: False input_file_type: jsonl output_file_type: jsonl \ No newline at end of file diff --git a/launcher_scripts/nemo_launcher/core/data_curation_stages.py b/launcher_scripts/nemo_launcher/core/data_curation_stages.py index e12d3c0c6f..8ba1ca084e 100644 --- a/launcher_scripts/nemo_launcher/core/data_curation_stages.py +++ b/launcher_scripts/nemo_launcher/core/data_curation_stages.py @@ -1180,15 +1180,17 @@ def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: # Create the list of arguments for the filter_documents command args = create_args_list( replace_underscore=True, + input_data_dir=self.memory.data_dir, output_data_dir=stage_cfg.get("output_data_dir"), id_field_name=stage_cfg.get("id_field_name"), id_prefix=stage_cfg.get("id_prefix"), - shuffle=stage_cfg.get("shuffle"), input_file_type=stage_cfg.get("input_file_type"), output_file_type=stage_cfg.get("output_file_type"), scheduler_file=self.log_folder / "scheduler.json", ) + self.memory.data_dir = stage_cfg.get("output_data_dir") + runscript = " \\\n ".join(["add_id", *args]) runscript_path = os.path.join(self.log_folder, "add_id.sh") From 5e1d9e9b354b778627fca7ab20223761622bdb32 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 24 May 2024 16:14:16 -0700 Subject: [PATCH 4/5] Fix style Signed-off-by: Ryan Wolf --- .../core/data_curation_stages.py | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/launcher_scripts/nemo_launcher/core/data_curation_stages.py b/launcher_scripts/nemo_launcher/core/data_curation_stages.py index 8ba1ca084e..7f54e5a4df 100644 --- a/launcher_scripts/nemo_launcher/core/data_curation_stages.py +++ b/launcher_scripts/nemo_launcher/core/data_curation_stages.py @@ -248,7 +248,7 @@ def run(self): class QualityFiltering(DataCurationSubStage): - """ DataCurationSubStage for performing quality filtering on documents """ + """DataCurationSubStage for performing quality filtering on documents""" def __init__(self, cfg, memory): super().__init__(cfg, memory) @@ -259,7 +259,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -327,7 +327,10 @@ def setup_stage_vars(self, cfg): self.cfg.get("data_curation").get(dataset_name).get(self.stage_name) ) - def _make_cluster_parameters(self, cluster: str,) -> Dict: + def _make_cluster_parameters( + self, + cluster: str, + ) -> Dict: """ Make a cluster-specific parameters for jobs on different clusters. Current clusters include bcm(slurm), bcp and interactive. @@ -363,7 +366,9 @@ def _make_cluster_parameters(self, cluster: str,) -> Dict: **slurm_cfg, } cluster_params.update( - {**shared_parameters,} + { + **shared_parameters, + } ) cluster_params["job_name"] = job_name_prefix + cluster_params["job_name"] cluster_params["nodes"] = nodes @@ -428,7 +433,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -488,7 +493,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -544,7 +549,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -589,7 +594,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -639,7 +644,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -698,7 +703,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -841,7 +846,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -886,7 +891,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -929,7 +934,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -970,7 +975,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1012,7 +1017,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1053,7 +1058,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1093,7 +1098,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1133,7 +1138,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1172,7 +1177,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1203,4 +1208,4 @@ def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: command_groups[-1] += [core_command_string] command_groups = clean_command_groups(command_groups) - return command_groups \ No newline at end of file + return command_groups From 74a3c29d6bc28408b6a025c0fd3e70f23ddcc7a3 Mon Sep 17 00:00:00 2001 From: Ryan Wolf Date: Fri, 24 May 2024 16:19:17 -0700 Subject: [PATCH 5/5] Fix style Signed-off-by: Ryan Wolf --- .../nemo_launcher/core/data_curation_stages.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/launcher_scripts/nemo_launcher/core/data_curation_stages.py b/launcher_scripts/nemo_launcher/core/data_curation_stages.py index 7f54e5a4df..2d306f84a9 100644 --- a/launcher_scripts/nemo_launcher/core/data_curation_stages.py +++ b/launcher_scripts/nemo_launcher/core/data_curation_stages.py @@ -327,10 +327,7 @@ def setup_stage_vars(self, cfg): self.cfg.get("data_curation").get(dataset_name).get(self.stage_name) ) - def _make_cluster_parameters( - self, - cluster: str, - ) -> Dict: + def _make_cluster_parameters(self, cluster: str,) -> Dict: """ Make a cluster-specific parameters for jobs on different clusters. Current clusters include bcm(slurm), bcp and interactive. @@ -366,9 +363,7 @@ def _make_cluster_parameters( **slurm_cfg, } cluster_params.update( - { - **shared_parameters, - } + {**shared_parameters,} ) cluster_params["job_name"] = job_name_prefix + cluster_params["job_name"] cluster_params["nodes"] = nodes