From e31417dbded1f9181105ad34fc172644081ca40f Mon Sep 17 00:00:00 2001 From: pratapAditya04 Date: Tue, 24 Mar 2026 14:06:20 +0530 Subject: [PATCH] Add batched parallel processing to ManifestBasedDataset Instead of submitting all files to parallelStream at once, files are now processed in configurable batches (default 100). Each batch is fully processed before the next begins, preventing unbounded concurrency on large manifests. New config key: gobblin.copy.manifestBased.parallelBatchSize (default 100) Co-Authored-By: Claude Sonnet 4.6 --- .../management/copy/ManifestBasedDataset.java | 34 +++++++++++++------ .../ManifestBasedDatasetFinderTest.java | 28 +++++++++++++++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java index e25ac0b81f0..2126bf5e0d2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java @@ -76,11 +76,13 @@ public class ManifestBasedDataset implements IterableCopyableDataset { // Parallelization configuration: when enabled, all manifest entries are loaded upfront and processed concurrently. // Any single file failure propagates immediately (fail-fast), aborting the entire operation. public static final String ENABLE_PARALLEL_PROCESSING = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".enableParallelProcessing"; + public static final String PARALLEL_BATCH_SIZE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".parallelBatchSize"; private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30"; private static final String DEFAULT_COMMON_FILES_PARENT = "/"; private static final boolean DEFAULT_SKIP_PERMISSION_CHECK = false; private static final boolean DEFAULT_ENABLE_PARALLEL_PROCESSING = true; + private static final int DEFAULT_PARALLEL_BATCH_SIZE = 100; private final FileSystem srcFs; private final FileSystem manifestReadFs; @@ -92,6 +94,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset { private final boolean enableSetPermissionPostPublish; private final boolean skipPermissionCheck; private final boolean enableParallelProcessing; + private final int parallelBatchSize; public ManifestBasedDataset(final FileSystem srcFs, final FileSystem manifestReadFs, final Path manifestPath, final Properties properties) { this.srcFs = srcFs; @@ -104,6 +107,7 @@ public ManifestBasedDataset(final FileSystem srcFs, final FileSystem manifestRea this.enableSetPermissionPostPublish = Boolean.parseBoolean(properties.getProperty(ENABLE_SET_PERMISSION_POST_PUBLISH, "true")); this.skipPermissionCheck = Boolean.parseBoolean(properties.getProperty(SKIP_PERMISSION_CHECK, String.valueOf(DEFAULT_SKIP_PERMISSION_CHECK))); this.enableParallelProcessing = Boolean.parseBoolean(properties.getProperty(ENABLE_PARALLEL_PROCESSING, String.valueOf(DEFAULT_ENABLE_PARALLEL_PROCESSING))); + this.parallelBatchSize = Integer.parseInt(properties.getProperty(PARALLEL_BATCH_SIZE, String.valueOf(DEFAULT_PARALLEL_BATCH_SIZE))); } @Override @@ -226,8 +230,9 @@ private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, } /** - * Processes all files concurrently using a parallel stream. Any single file failure propagates immediately - * (fail-fast), causing the entire operation to abort with an exception. + * Processes all files concurrently in batches using a parallel stream. Files are divided into batches of + * {@code parallelBatchSize} and each batch is processed fully before the next begins. Any single file failure + * propagates immediately (fail-fast), aborting the entire operation. */ private void processFilesInParallel(List allFiles, FileSystem targetFs, CopyConfiguration configuration, Cache permissionMap, @@ -235,15 +240,22 @@ private void processFilesInParallel(List allFiles, Fi Map> ancestorOwnerAndPermissions, Map> ancestorOwnerAndPermissionsForSetPermissionStep, Map existingDirectoryPermissionsForSetPermissionStep) { - allFiles.parallelStream().forEach(file -> { - try { - processFile(file, targetFs, configuration, permissionMap, copyEntities, toDelete, - ancestorOwnerAndPermissions, ancestorOwnerAndPermissionsForSetPermissionStep, - existingDirectoryPermissionsForSetPermissionStep); - } catch (Exception e) { - throw new RuntimeException("Failed to process file: " + file.fileName, e); - } - }); + List> batches = Lists.partition(allFiles, parallelBatchSize); + log.info("Processing {} files in {} batches of up to {} files each", allFiles.size(), batches.size(), parallelBatchSize); + int batchNum = 0; + for (List batch : batches) { + batchNum++; + log.info("Processing batch {}/{} ({} files)", batchNum, batches.size(), batch.size()); + batch.parallelStream().forEach(file -> { + try { + processFile(file, targetFs, configuration, permissionMap, copyEntities, toDelete, + ancestorOwnerAndPermissions, ancestorOwnerAndPermissionsForSetPermissionStep, + existingDirectoryPermissionsForSetPermissionStep); + } catch (Exception e) { + throw new RuntimeException("Failed to process file: " + file.fileName, e); + } + }); + } } private void processFile(CopyManifest.CopyableUnit file, FileSystem targetFs, diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java index dfe0595c705..6b0777acd19 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java @@ -594,6 +594,34 @@ public void testParallelProcessingProducesSameResults() throws Exception { } } + /** + * Verifies that parallel processing with a batch size smaller than the file count (forcing multiple batches) + * still produces the same results as a single batch. + */ + @Test + public void testParallelProcessingWithSmallBatchSize() throws Exception { + Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath()); + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/"); + props.setProperty(ManifestBasedDataset.ENABLE_PARALLEL_PROCESSING, "true"); + props.setProperty(ManifestBasedDataset.PARALLEL_BATCH_SIZE, "1"); // 2 files, batch size 1 => 2 batches + + try (FileSystem sourceFs = Mockito.mock(FileSystem.class); + FileSystem manifestReadFs = Mockito.mock(FileSystem.class); + FileSystem destFs = Mockito.mock(FileSystem.class)) { + setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, true); + + Iterator> fileSets = + new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, + CopyConfiguration.builder(destFs, props).build()); + Assert.assertTrue(fileSets.hasNext()); + FileSet fileSet = fileSets.next(); + Assert.assertEquals(fileSet.getFiles().size(), 4); // 2 files to copy + 1 pre publish step + 1 post publish step + Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(2)).getStep() instanceof CreateDirectoryWithPermissionsCommitStep); + Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(3)).getStep() instanceof SetPermissionCommitStep); + } + } + /** * Verifies that sequential processing (parallel disabled) produces the same copy entities. */