diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java index e25ac0b81f0..2126bf5e0d2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java @@ -76,11 +76,13 @@ public class ManifestBasedDataset implements IterableCopyableDataset { // Parallelization configuration: when enabled, all manifest entries are loaded upfront and processed concurrently. // Any single file failure propagates immediately (fail-fast), aborting the entire operation. public static final String ENABLE_PARALLEL_PROCESSING = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".enableParallelProcessing"; + public static final String PARALLEL_BATCH_SIZE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".parallelBatchSize"; private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30"; private static final String DEFAULT_COMMON_FILES_PARENT = "/"; private static final boolean DEFAULT_SKIP_PERMISSION_CHECK = false; private static final boolean DEFAULT_ENABLE_PARALLEL_PROCESSING = true; + private static final int DEFAULT_PARALLEL_BATCH_SIZE = 100; private final FileSystem srcFs; private final FileSystem manifestReadFs; @@ -92,6 +94,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset { private final boolean enableSetPermissionPostPublish; private final boolean skipPermissionCheck; private final boolean enableParallelProcessing; + private final int parallelBatchSize; public ManifestBasedDataset(final FileSystem srcFs, final FileSystem manifestReadFs, final Path manifestPath, final Properties properties) { this.srcFs = srcFs; @@ -104,6 +107,7 @@ public ManifestBasedDataset(final FileSystem srcFs, final FileSystem manifestRea this.enableSetPermissionPostPublish = Boolean.parseBoolean(properties.getProperty(ENABLE_SET_PERMISSION_POST_PUBLISH, "true")); this.skipPermissionCheck = Boolean.parseBoolean(properties.getProperty(SKIP_PERMISSION_CHECK, String.valueOf(DEFAULT_SKIP_PERMISSION_CHECK))); this.enableParallelProcessing = Boolean.parseBoolean(properties.getProperty(ENABLE_PARALLEL_PROCESSING, String.valueOf(DEFAULT_ENABLE_PARALLEL_PROCESSING))); + this.parallelBatchSize = Integer.parseInt(properties.getProperty(PARALLEL_BATCH_SIZE, String.valueOf(DEFAULT_PARALLEL_BATCH_SIZE))); } @Override @@ -226,8 +230,9 @@ private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, } /** - * Processes all files concurrently using a parallel stream. Any single file failure propagates immediately - * (fail-fast), causing the entire operation to abort with an exception. + * Processes all files concurrently in batches using a parallel stream. Files are divided into batches of + * {@code parallelBatchSize} and each batch is processed fully before the next begins. Any single file failure + * propagates immediately (fail-fast), aborting the entire operation. */ private void processFilesInParallel(List allFiles, FileSystem targetFs, CopyConfiguration configuration, Cache permissionMap, @@ -235,15 +240,22 @@ private void processFilesInParallel(List allFiles, Fi Map> ancestorOwnerAndPermissions, Map> ancestorOwnerAndPermissionsForSetPermissionStep, Map existingDirectoryPermissionsForSetPermissionStep) { - allFiles.parallelStream().forEach(file -> { - try { - processFile(file, targetFs, configuration, permissionMap, copyEntities, toDelete, - ancestorOwnerAndPermissions, ancestorOwnerAndPermissionsForSetPermissionStep, - existingDirectoryPermissionsForSetPermissionStep); - } catch (Exception e) { - throw new RuntimeException("Failed to process file: " + file.fileName, e); - } - }); + List> batches = Lists.partition(allFiles, parallelBatchSize); + log.info("Processing {} files in {} batches of up to {} files each", allFiles.size(), batches.size(), parallelBatchSize); + int batchNum = 0; + for (List batch : batches) { + batchNum++; + log.info("Processing batch {}/{} ({} files)", batchNum, batches.size(), batch.size()); + batch.parallelStream().forEach(file -> { + try { + processFile(file, targetFs, configuration, permissionMap, copyEntities, toDelete, + ancestorOwnerAndPermissions, ancestorOwnerAndPermissionsForSetPermissionStep, + existingDirectoryPermissionsForSetPermissionStep); + } catch (Exception e) { + throw new RuntimeException("Failed to process file: " + file.fileName, e); + } + }); + } } private void processFile(CopyManifest.CopyableUnit file, FileSystem targetFs, diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java index dfe0595c705..6b0777acd19 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java @@ -594,6 +594,34 @@ public void testParallelProcessingProducesSameResults() throws Exception { } } + /** + * Verifies that parallel processing with a batch size smaller than the file count (forcing multiple batches) + * still produces the same results as a single batch. + */ + @Test + public void testParallelProcessingWithSmallBatchSize() throws Exception { + Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath()); + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/"); + props.setProperty(ManifestBasedDataset.ENABLE_PARALLEL_PROCESSING, "true"); + props.setProperty(ManifestBasedDataset.PARALLEL_BATCH_SIZE, "1"); // 2 files, batch size 1 => 2 batches + + try (FileSystem sourceFs = Mockito.mock(FileSystem.class); + FileSystem manifestReadFs = Mockito.mock(FileSystem.class); + FileSystem destFs = Mockito.mock(FileSystem.class)) { + setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, true); + + Iterator> fileSets = + new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, + CopyConfiguration.builder(destFs, props).build()); + Assert.assertTrue(fileSets.hasNext()); + FileSet fileSet = fileSets.next(); + Assert.assertEquals(fileSet.getFiles().size(), 4); // 2 files to copy + 1 pre publish step + 1 post publish step + Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(2)).getStep() instanceof CreateDirectoryWithPermissionsCommitStep); + Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(3)).getStep() instanceof SetPermissionCommitStep); + } + } + /** * Verifies that sequential processing (parallel disabled) produces the same copy entities. */