11#!/usr/bin/env python
22
3+ import itertools
34import os
45from enum import Enum
56from pathlib import Path
6- from typing import Optional
7+ from typing import Any , Callable , Optional
78
89import numpy as np
910import tqdm
1718from modalities .dataloader .create_packed_data import EmbeddedStreamData , PackedDataGenerator , join_embedded_stream_data
1819from modalities .dataloader .dataset import PackedMemMapDatasetBase
1920from modalities .dataloader .large_file_lines_reader import LargeFileLinesReader
20- from modalities .dataloader .preprocessing .chunking .create_chunks import Chunking
2121from modalities .dataloader .preprocessing .tokenization .tokenized_file_writer import TokenizedFileWriter
22- from modalities .dataloader .shuffle_tokenized_data import TokenizedDataShuffler
2322from modalities .models .huggingface_adapters .hf_adapter import HFModelAdapter
23+ from modalities .preprocessing .create_chunks import Chunking
24+ from modalities .preprocessing .shuffle_data import DataShuffler
2425from modalities .registry .components import COMPONENTS
2526from modalities .registry .registry import Registry
2627from modalities .utils .logging import get_logger
@@ -77,6 +78,8 @@ def create_raw_data_index(
7778 Raises:
7879 ValueError: If the index file already exists.
7980 """
81+ if src_path == index_path :
82+ raise ValueError ("Input and output index paths must be different." )
8083 index_path = LargeFileLinesReader .default_index_path (src_path , index_path )
8184 if index_path .exists ():
8285 stop_process = enforce_file_existence_policy (index_path , file_existence_policy )
@@ -136,15 +139,77 @@ def shuffle_tokenized_data(
136139 file_existence_policy (FileExistencePolicy): Policy to apply when the output file already exists.
137140 seed (Optional[int]): The seed to use for shuffling.
138141 """
142+ if input_data_path == output_data_path :
143+ raise ValueError ("Input and output file paths must be different." )
139144 if output_data_path .exists ():
140- if not enforce_file_existence_policy (output_data_path , file_existence_policy ):
145+ stop_process = enforce_file_existence_policy (output_data_path , file_existence_policy )
146+ if stop_process :
141147 return
142148
143- TokenizedDataShuffler .shuffle_tokenized_data (
149+ DataShuffler .shuffle_tokenized_data (
144150 input_data_path = input_data_path , output_data_path = output_data_path , batch_size = batch_size , seed = seed
145151 )
146152
147153
154+ def shuffle_jsonl_data (
155+ input_data_path : Path ,
156+ output_data_path : Path ,
157+ file_existence_policy : FileExistencePolicy ,
158+ seed : Optional [int ] = None ,
159+ ):
160+ """Shuffles a JSONL file (.jsonl) and stores it on disc.
161+
162+ Args:
163+ input_data_path (Path): File path to the jsonl data (.jsonl).
164+ output_data_path (Path): File path to write the shuffled jsonl data.
165+ file_existence_policy (FileExistencePolicy): Policy to apply when the output file already exists.
166+ seed (Optional[int]): The seed to use for shuffling.
167+ """
168+ if input_data_path == output_data_path :
169+ raise ValueError ("Input and output file paths must be different." )
170+ if output_data_path .exists ():
171+ stop_process = enforce_file_existence_policy (output_data_path , file_existence_policy )
172+ if stop_process :
173+ return
174+
175+ DataShuffler .shuffle_jsonl_data (input_data_path = input_data_path , output_data_path = output_data_path , seed = seed )
176+
177+
178+ def create_filtered_tokenized_dataset (
179+ input_data_path : Path ,
180+ filter_routine : Callable [[int ], bool ],
181+ output_data_path : Path ,
182+ file_existence_policy : FileExistencePolicy ,
183+ ):
184+ if input_data_path == output_data_path :
185+ raise ValueError ("Input and output file paths must be different." )
186+ if output_data_path .exists ():
187+ stop_process = enforce_file_existence_policy (output_data_path , file_existence_policy )
188+ if stop_process :
189+ return
190+
191+ sample_key = "text"
192+
193+ dataset = PackedMemMapDatasetBase (raw_data_path = input_data_path , sample_key = sample_key , load_index = True )
194+
195+ # Both generators below run lazily.
196+ filter_generator = (filter_routine (i ) for i in range (len (dataset )))
197+ # We lazily extract the samples, as the TokenizedFileWriter.write_tokenized_dataset
198+ # expects an iterator of numpy arrays.
199+ samples_extrator = (dataset [i ][sample_key ] for i in range (len (dataset )))
200+ # Automatically skips samples for which the filter_routine returns False.
201+ # Also evaluates lazily.
202+ dataset_filtered = itertools .compress (samples_extrator , filter_generator )
203+
204+ get_logger (name = "main" ).info (f"Writing filtered dataset to { str (output_data_path )} ..." )
205+ TokenizedFileWriter .write_tokenized_dataset (
206+ tokenized_dataset = dataset_filtered ,
207+ tokenized_dataset_file_path = output_data_path ,
208+ token_size_in_bytes = dataset .token_size_in_bytes ,
209+ )
210+ get_logger (name = "main" ).info (f"Filtered dataset was successfully written to { str (output_data_path )} ." )
211+
212+
148213def create_shuffled_dataset_chunk (
149214 file_path_list : list [Path ],
150215 output_chunk_file_path : Path ,
@@ -171,19 +236,22 @@ def create_shuffled_dataset_chunk(
171236 ValueError: If the chunk has no samples.
172237 """
173238 if output_chunk_file_path .exists ():
174- if not enforce_file_existence_policy (output_chunk_file_path , file_existence_policy ):
239+ stop_process = enforce_file_existence_policy (output_chunk_file_path , file_existence_policy )
240+ if stop_process :
175241 return
176242
177243 samples = []
178244 token_size_in_bytes = None
179245 for file_path in tqdm .tqdm (file_path_list , desc = f"Loading file chunks of { chunk_id = } " ):
246+ if file_path == output_chunk_file_path :
247+ raise ValueError ("Input and output chunk file paths must be different." )
180248 dataset = PackedMemMapDatasetBase (raw_data_path = file_path , sample_key = "text" , load_index = True )
181249 if token_size_in_bytes is None :
182250 token_size_in_bytes = dataset .token_size_in_bytes
183251 elif token_size_in_bytes != dataset .token_size_in_bytes :
184252 raise ValueError ("All datasets must have the same token size in bytes." )
185253
186- file_samples : list [np .ndarray ] = Chunking .get_file_chunk (
254+ file_samples : list [np .ndarray ] = Chunking .get_tokenized_file_chunk (
187255 dataset = dataset , num_chunks = num_chunks , chunk_id = chunk_id
188256 )
189257 samples .extend (file_samples )
@@ -207,6 +275,65 @@ def create_shuffled_dataset_chunk(
207275 get_logger (name = "main" ).info (f"Chunk { chunk_id } was successfully written to { str (output_chunk_file_path )} ." )
208276
209277
278+ def create_shuffled_jsonl_dataset_chunk (
279+ file_path_list : list [Path ],
280+ output_chunk_file_path : Path ,
281+ chunk_id : int ,
282+ num_chunks : int ,
283+ file_existence_policy : FileExistencePolicy ,
284+ global_seed : Optional [int ] = None ,
285+ ):
286+ """Creates a shuffled jsonl dataset chunk.
287+ Given a dataset consisting of multiple jsonl files, this function
288+ creates a shuffled dataset chunk for a given chunk id.
289+ From each jsonl file, the respective chunk is extracted, shuffled
290+ and written to a new jsonl file.
291+
292+ Args:
293+ file_path_list (list[Path]): List of paths to the input jsonl files.
294+ output_chunk_file_path (Path): Path to the output chunk which will be stored in jsonl format.
295+ chunk_id (int): The id of the chunk to create.
296+ num_chunks (int): The total number of chunks to create.
297+ file_existence_policy (FileExistencePolicy): Policy to apply when the output chunk file already exists.
298+ global_seed (Optional[int]): The global seed to use for shuffling.
299+
300+ Raises:
301+ ValueError: If the chunk has no samples.
302+ """
303+ if output_chunk_file_path .exists ():
304+ stop_process = enforce_file_existence_policy (output_chunk_file_path , file_existence_policy )
305+ if stop_process :
306+ return
307+
308+ samples = []
309+ for file_path in tqdm .tqdm (file_path_list , desc = f"Loading file chunks of { chunk_id = } " ):
310+ if file_path == output_chunk_file_path :
311+ raise ValueError ("Input and output chunk file paths must be different." )
312+ with open (file_path , "rb" ) as f :
313+ dataset = f .readlines ()
314+
315+ file_samples : list [Any ] = Chunking .get_jsonl_file_chunk (
316+ dataset = dataset , num_chunks = num_chunks , chunk_id = chunk_id
317+ )
318+ samples .extend (file_samples )
319+
320+ if len (samples ) == 0 :
321+ raise ValueError (
322+ f"Chunk { chunk_id } has no samples. Please decrease the number of chunks to less than { chunk_id } ."
323+ )
324+
325+ # samples are shuffled in place
326+ get_logger (name = "main" ).info (f"Shuffling chunk { chunk_id } ..." )
327+ seed = calculate_hashed_seed (input_data = [str (global_seed ), str (chunk_id )]) if global_seed is not None else None
328+ Chunking .shuffle_file_chunks_in_place (file_chunks = samples , seed = seed )
329+
330+ get_logger (name = "main" ).info (f"Writing chunk { chunk_id } to { str (output_chunk_file_path )} ..." )
331+ with open (output_chunk_file_path , "wb" ) as f :
332+ for sample in samples :
333+ f .write (sample )
334+ get_logger (name = "main" ).info (f"Chunk { chunk_id } was successfully written to { str (output_chunk_file_path )} ." )
335+
336+
210337def pack_encoded_data (
211338 config_dict : dict ,
212339 file_existence_policy : FileExistencePolicy ,
@@ -234,7 +361,8 @@ def pack_encoded_data(
234361 )
235362
236363 if components .settings .dst_path .exists ():
237- if not enforce_file_existence_policy (components .settings .dst_path , file_existence_policy ):
364+ stop_process = enforce_file_existence_policy (components .settings .dst_path , file_existence_policy )
365+ if stop_process :
238366 return
239367
240368 generator = PackedDataGenerator (
0 commit comments