Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
// Parallelization configuration: when enabled, all manifest entries are loaded upfront and processed concurrently.
// Any single file failure propagates immediately (fail-fast), aborting the entire operation.
public static final String ENABLE_PARALLEL_PROCESSING = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".enableParallelProcessing";
public static final String PARALLEL_BATCH_SIZE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".parallelBatchSize";

private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
private static final String DEFAULT_COMMON_FILES_PARENT = "/";
private static final boolean DEFAULT_SKIP_PERMISSION_CHECK = false;
private static final boolean DEFAULT_ENABLE_PARALLEL_PROCESSING = true;
private static final int DEFAULT_PARALLEL_BATCH_SIZE = 100;

private final FileSystem srcFs;
private final FileSystem manifestReadFs;
Expand All @@ -92,6 +94,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
private final boolean enableSetPermissionPostPublish;
private final boolean skipPermissionCheck;
private final boolean enableParallelProcessing;
private final int parallelBatchSize;

public ManifestBasedDataset(final FileSystem srcFs, final FileSystem manifestReadFs, final Path manifestPath, final Properties properties) {
this.srcFs = srcFs;
Expand All @@ -104,6 +107,7 @@ public ManifestBasedDataset(final FileSystem srcFs, final FileSystem manifestRea
this.enableSetPermissionPostPublish = Boolean.parseBoolean(properties.getProperty(ENABLE_SET_PERMISSION_POST_PUBLISH, "true"));
this.skipPermissionCheck = Boolean.parseBoolean(properties.getProperty(SKIP_PERMISSION_CHECK, String.valueOf(DEFAULT_SKIP_PERMISSION_CHECK)));
this.enableParallelProcessing = Boolean.parseBoolean(properties.getProperty(ENABLE_PARALLEL_PROCESSING, String.valueOf(DEFAULT_ENABLE_PARALLEL_PROCESSING)));
this.parallelBatchSize = Integer.parseInt(properties.getProperty(PARALLEL_BATCH_SIZE, String.valueOf(DEFAULT_PARALLEL_BATCH_SIZE)));
}

@Override
Expand Down Expand Up @@ -226,24 +230,32 @@ private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource,
}

/**
* Processes all files concurrently using a parallel stream. Any single file failure propagates immediately
* (fail-fast), causing the entire operation to abort with an exception.
* Processes all files concurrently in batches using a parallel stream. Files are divided into batches of
* {@code parallelBatchSize} and each batch is processed fully before the next begins. Any single file failure
* propagates immediately (fail-fast), aborting the entire operation.
*/
private void processFilesInParallel(List<CopyManifest.CopyableUnit> allFiles, FileSystem targetFs,
CopyConfiguration configuration, Cache<String, OwnerAndPermission> permissionMap,
List<CopyEntity> copyEntities, List<FileStatus> toDelete,
Map<String, List<OwnerAndPermission>> ancestorOwnerAndPermissions,
Map<String, List<OwnerAndPermission>> ancestorOwnerAndPermissionsForSetPermissionStep,
Map<String, OwnerAndPermission> existingDirectoryPermissionsForSetPermissionStep) {
allFiles.parallelStream().forEach(file -> {
try {
processFile(file, targetFs, configuration, permissionMap, copyEntities, toDelete,
ancestorOwnerAndPermissions, ancestorOwnerAndPermissionsForSetPermissionStep,
existingDirectoryPermissionsForSetPermissionStep);
} catch (Exception e) {
throw new RuntimeException("Failed to process file: " + file.fileName, e);
}
});
List<List<CopyManifest.CopyableUnit>> batches = Lists.partition(allFiles, parallelBatchSize);
log.info("Processing {} files in {} batches of up to {} files each", allFiles.size(), batches.size(), parallelBatchSize);
int batchNum = 0;
for (List<CopyManifest.CopyableUnit> batch : batches) {
batchNum++;
log.info("Processing batch {}/{} ({} files)", batchNum, batches.size(), batch.size());
batch.parallelStream().forEach(file -> {
try {
processFile(file, targetFs, configuration, permissionMap, copyEntities, toDelete,
ancestorOwnerAndPermissions, ancestorOwnerAndPermissionsForSetPermissionStep,
existingDirectoryPermissionsForSetPermissionStep);
} catch (Exception e) {
throw new RuntimeException("Failed to process file: " + file.fileName, e);
}
});
}
}

private void processFile(CopyManifest.CopyableUnit file, FileSystem targetFs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,34 @@ public void testParallelProcessingProducesSameResults() throws Exception {
}
}

/**
* Verifies that parallel processing with a batch size smaller than the file count (forcing multiple batches)
* still produces the same results as a single batch.
*/
@Test
public void testParallelProcessingWithSmallBatchSize() throws Exception {
Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
Properties props = new Properties();
props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
props.setProperty(ManifestBasedDataset.ENABLE_PARALLEL_PROCESSING, "true");
props.setProperty(ManifestBasedDataset.PARALLEL_BATCH_SIZE, "1"); // 2 files, batch size 1 => 2 batches

try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class)) {
setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, true);

Iterator<FileSet<CopyEntity>> fileSets =
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs,
CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 4); // 2 files to copy + 1 pre publish step + 1 post publish step
Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(2)).getStep() instanceof CreateDirectoryWithPermissionsCommitStep);
Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(3)).getStep() instanceof SetPermissionCommitStep);
}
}

/**
* Verifies that sequential processing (parallel disabled) produces the same copy entities.
*/
Expand Down
Loading