diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java index b6eff3c80..fd2210d0b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java @@ -54,7 +54,7 @@ *
  • Driver creates BulkWriterContext using constructor
  • *
  • Driver extracts BulkWriterConfig in {@link CassandraBulkSourceRelation} constructor
  • *
  • BulkWriterConfig gets broadcast to executors
  • - *
  • Executors reconstruct BulkWriterContext via {@link BulkWriterContext#from(BulkWriterConfig)}
  • + *
  • Executors reconstruct BulkWriterContext via {@link BulkWriterConfig#toBulkWriterContext()}
  • * * *

    Broadcastable wrappers used in BulkWriterConfig: @@ -112,7 +112,7 @@ protected AbstractBulkWriterContext(@NotNull BulkSparkConf conf, /** * Constructor for executor usage. * Reconstructs components from broadcast configuration on executors. - * This is used by the factory method {@link BulkWriterContext#from(BulkWriterConfig)}. + * This is used by {@link BulkWriterConfig#toBulkWriterContext()}. * * @param config immutable configuration for the bulk writer with pre-computed values */ diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java index e2a3283aa..c58753f1d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java @@ -21,6 +21,7 @@ import java.io.Serializable; +import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext; import org.jetbrains.annotations.NotNull; /** @@ -42,10 +43,10 @@ * and minimize Spark SizeEstimator overhead. *

    * On executors, {@link BulkWriterContext} instances are reconstructed from this config using - * {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the broadcastable - * wrappers and reconstructs the full implementations with fresh data from Cassandra Sidecar. + * {@link #toBulkWriterContext()}, which creates the appropriate context implementation and + * reconstructs the full implementations with fresh data from Cassandra Sidecar. */ -public final class BulkWriterConfig implements Serializable +public class BulkWriterConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -111,4 +112,17 @@ public String getLowestCassandraVersion() { return lowestCassandraVersion; } + + /** + * Factory method that reconstructs a {@link BulkWriterContext} on executors from this broadcast config. + * Subclasses may override this to return custom context implementations for specialized reconstruction. + * + * @return a new BulkWriterContext instance appropriate for the current configuration + */ + public BulkWriterContext toBulkWriterContext() + { + return getConf().isCoordinatedWriteConfigured() + ? new CassandraCoordinatedBulkWriterContext(this) + : new CassandraBulkWriterContext(this); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java index a76b7c60f..8f240215c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java @@ -22,6 +22,7 @@ import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext; import org.apache.cassandra.spark.common.stats.JobStatsPublisher; import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.spark.api.java.JavaSparkContext; /** * Context for bulk write operations, providing access to cluster, job, schema, and transport information. @@ -29,7 +30,7 @@ * Serialization Architecture: * This interface does NOT extend Serializable. BulkWriterContext instances are never broadcast to executors. * Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct BulkWriterContext instances - * from the config using the factory method {@link #from(BulkWriterConfig)}. + * from the config using {@link BulkWriterConfig#toBulkWriterContext()}. *

    * The implementations ({@link CassandraBulkWriterContext}, {@link CassandraCoordinatedBulkWriterContext}) * do NOT have serialVersionUID fields as they are never serialized. @@ -53,23 +54,12 @@ public interface BulkWriterContext TransportContext transportContext(); /** - * Factory method to create a BulkWriterContext from a BulkWriterConfig on executors. - * This method reconstructs context instances on executors from the broadcast configuration. - * The driver creates contexts directly using constructors, not this method. + * Converts this context into an immutable {@link BulkWriterConfig} suitable for broadcasting to executors. + * Executors reconstruct a full {@link BulkWriterContext} from the config via + * {@link BulkWriterConfig#toBulkWriterContext()}. * - * @param config the immutable configuration object broadcast from driver - * @return a new BulkWriterContext instance + * @param sparkContext the Spark context (used to obtain default parallelism) + * @return an immutable config containing all broadcastable state */ - static BulkWriterContext from(BulkWriterConfig config) - { - BulkSparkConf conf = config.getConf(); - if (conf.isCoordinatedWriteConfigured()) - { - return new CassandraCoordinatedBulkWriterContext(config); - } - else - { - return new CassandraBulkWriterContext(config); - } - } + BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index f6323af36..44bd5e166 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -42,7 +42,6 @@ import org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult; import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator; import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator; -import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf; @@ -90,69 +89,13 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s this.sqlContext = sqlContext; this.sparkContext = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()); // Extract immutable configuration from the context for broadcasting - BulkWriterConfig config = extractConfig(writerContext, sparkContext.defaultParallelism()); + BulkWriterConfig config = writerContext.toBulkWriterConfigForBroadcasting(sparkContext); this.broadcastConfig = sparkContext.broadcast(config); ReplicaAwareFailureHandler failureHandler = new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); this.simpleTaskScheduler = new SimpleTaskScheduler(); } - /** - * Extracts immutable configuration from a BulkWriterContext for broadcasting. - * Creates BroadcastableCluster, BroadcastableJobInfo, and BroadcastableSchemaInfo - * to ensure zero transient fields and avoid Logger references in the broadcast object. - */ - private static BulkWriterConfig extractConfig(BulkWriterContext context, int sparkDefaultParallelism) - { - if (context instanceof AbstractBulkWriterContext) - { - AbstractBulkWriterContext abstractContext = (AbstractBulkWriterContext) context; - ClusterInfo originalClusterInfo = abstractContext.cluster(); - - // Create BroadcastableCluster to avoid transient fields in broadcast - IBroadcastableClusterInfo broadcastableClusterInfo; - if (originalClusterInfo instanceof CassandraClusterInfoGroup) - { - // Coordinated write scenario - @SuppressWarnings("unchecked") - CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) originalClusterInfo; - broadcastableClusterInfo = BroadcastableClusterInfoGroup.from( - multiCluster, - abstractContext.bulkSparkConf() - ); - } - else - { - // Single cluster scenario - broadcastableClusterInfo = BroadcastableClusterInfo.from( - originalClusterInfo, - abstractContext.bulkSparkConf() - ); - } - - // Create BroadcastableJobInfo to avoid Logger in TokenPartitioner - BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from( - abstractContext.job(), - abstractContext.bulkSparkConf() - ); - - // Create BroadcastableSchemaInfo to avoid Logger in TableSchema - BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from( - abstractContext.schema() - ); - - return new BulkWriterConfig( - abstractContext.bulkSparkConf(), - sparkDefaultParallelism, - broadcastableJobInfo, - broadcastableClusterInfo, - broadcastableSchemaInfo, - abstractContext.lowestCassandraVersion() - ); - } - throw new IllegalArgumentException("Cannot extract config from context type: " + context.getClass().getName()); - } - @Override @NotNull public SQLContext sqlContext() diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 5c5168374..f54b7190e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -45,7 +46,7 @@ protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf, } /** - * Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)} factory method. + * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}. * This constructor is only used on executors to reconstruct context from broadcast config. * * @param config immutable configuration for the bulk writer @@ -81,4 +82,21 @@ protected MultiClusterContainer generateRestoreJobIds() { return MultiClusterContainer.ofSingle(bridge().getTimeUUID()); } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) + { + IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfo.from(cluster(), bulkSparkConf()); + BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); + BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); + + return new BulkWriterConfig( + bulkSparkConf(), + sparkContext.defaultParallelism(), + broadcastableJobInfo, + broadcastableClusterInfo, + broadcastableSchemaInfo, + lowestCassandraVersion() + ); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java index ad143fb0c..a9c120871 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java @@ -65,24 +65,32 @@ /** * Driver-only implementation of {@link ClusterInfo} for single cluster operations. *

    - * This class is NOT serialized and does NOT have a serialVersionUID. - * When broadcasting to executors, the driver extracts information from this class - * and creates a {@link BroadcastableClusterInfo} instance, which is then included - * in the {@link BulkWriterConfig} that gets broadcast. + * This class is NOT serialized. When broadcasting to executors, the driver extracts + * broadcast-safe fields via {@link BroadcastableClusterInfo#from(ClusterInfo, BulkSparkConf)} + * and includes the result in the {@link BulkWriterConfig} that gets broadcast. *

    - * This class implements Serializable only because the {@link ClusterInfo} interface - * requires it (for use as a field type in broadcast classes), but instances of this - * class are never directly serialized. + * On executors, a new instance is reconstructed from {@link BroadcastableClusterInfo} + * using {@link #CassandraClusterInfo(BroadcastableClusterInfo)}, reusing broadcast-safe + * fields and fetching other data fresh from Sidecar. + * + * @see BroadcastableClusterInfo for the broadcast-safe subset of fields */ public class CassandraClusterInfo implements ClusterInfo, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfo.class); + // -- Broadcast-safe fields -- + // Extracted by BroadcastableClusterInfo.from() and sent to executors. + // Changes here must be reflected in BroadcastableClusterInfo. protected final BulkSparkConf conf; protected final String clusterId; protected String cassandraVersion; protected Partitioner partitioner; + // -- Driver-only fields (not broadcast) -- + // NOT included in BroadcastableClusterInfo. Either expensive to serialize + // (token mappings, schema) or non-serializable (CassandraContext, Futures). + // Executors reconstruct these fresh from Sidecar via CassandraClusterInfo(BroadcastableClusterInfo). protected volatile TokenRangeMapping tokenRangeReplicas; protected volatile String keyspaceSchema; protected volatile ReplicationFactor replicationFactor; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index 33d094e41..d707a651b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -93,7 +93,7 @@ public class RecordWriter */ public RecordWriter(BulkWriterConfig config, String[] columnNames) { - this(BulkWriterContext.from(config), columnNames, TaskContext::get, SortedSSTableWriter::new); + this(config.toBulkWriterContext(), columnNames, TaskContext::get, SortedSSTableWriter::new); } @VisibleForTesting diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java index 9fd870cdc..4a434a7a5 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java @@ -48,6 +48,7 @@ import org.apache.cassandra.spark.bulkwriter.RingInstance; import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo; import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup; +import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo; import org.apache.cassandra.spark.bulkwriter.WriteAvailability; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; @@ -136,7 +137,14 @@ public static CassandraClusterInfoGroup fromBulkSparkConf(BulkSparkConf conf, Fu return new CassandraClusterInfoGroup(clusterInfos); } - @VisibleForTesting // ONLY FOR TESTING + /** + * Creates a {@link CassandraClusterInfoGroup} from a pre-built list of {@link ClusterInfo} instances. + * This factory is intended for custom {@link IBroadcastableClusterInfo} implementations that reconstruct + * cluster infos individually and need to wrap them in a group. + * + * @param clusterInfos the list of already-reconstructed ClusterInfo instances + * @return a new CassandraClusterInfoGroup + */ public static CassandraClusterInfoGroup createFrom(List clusterInfos) { return new CassandraClusterInfoGroup(clusterInfos); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java index 5329fc213..9b00c7a1d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java @@ -25,10 +25,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext; +import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup; +import org.apache.cassandra.spark.bulkwriter.BroadcastableJobInfo; +import org.apache.cassandra.spark.bulkwriter.BroadcastableSchemaInfo; import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig; import org.apache.cassandra.spark.bulkwriter.ClusterInfo; import org.apache.cassandra.spark.bulkwriter.DataTransport; +import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -50,7 +55,7 @@ public CassandraCoordinatedBulkWriterContext(@NotNull BulkSparkConf conf, } /** - * Constructor used by {@link org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)} factory method. + * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}. * This constructor is only used on executors to reconstruct context from broadcast config. * * @param config immutable configuration for the bulk writer @@ -115,4 +120,22 @@ protected CassandraClusterInfoGroup clusterInfoGroup() { return (CassandraClusterInfoGroup) cluster(); } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) + { + CassandraClusterInfoGroup multiCluster = clusterInfoGroup(); + IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf()); + BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); + BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); + + return new BulkWriterConfig( + bulkSparkConf(), + sparkContext.defaultParallelism(), + broadcastableJobInfo, + broadcastableClusterInfo, + broadcastableSchemaInfo, + lowestCassandraVersion() + ); + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java new file mode 100644 index 000000000..380c44600 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.util.UUID; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer; +import org.apache.cassandra.spark.common.stats.JobStatsPublisher; +import org.jetbrains.annotations.NotNull; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests that verify the extensibility contract for the bulk writer broadcast/reconstruction chain. + * These tests prove that downstream implementations can: + *

    + */ +class BulkWriterConfigExtensibilityTest +{ + @Test + void testToBulkWriterContextCanBeOverridden() + { + BulkSparkConf mockConf = mock(BulkSparkConf.class); + BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); + IBroadcastableClusterInfo mockClusterInfo = mock(IBroadcastableClusterInfo.class); + BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); + + // A custom BulkWriterConfig subclass overriding toBulkWriterContext() + BulkWriterConfig customConfig = new BulkWriterConfig(mockConf, 4, mockJobInfo, mockClusterInfo, mockSchemaInfo, "4.0.0") + { + @Override + public BulkWriterContext toBulkWriterContext() + { + return mock(BulkWriterContext.class); + } + }; + + BulkWriterContext context = customConfig.toBulkWriterContext(); + assertThat(context).isNotNull(); + // The base class would return CassandraBulkWriterContext or CassandraCoordinatedBulkWriterContext, + // but our subclass returns a mock — proving the override is dispatched. + assertThat(context).isNotInstanceOf(CassandraBulkWriterContext.class); + } + + @Test + void testCustomIBroadcastableClusterInfoReconstructIsCalled() + { + ClusterInfo expectedCluster = mock(ClusterInfo.class); + IBroadcastableClusterInfo mockBroadcastable = mock(IBroadcastableClusterInfo.class); + when(mockBroadcastable.reconstruct()).thenReturn(expectedCluster); + + BulkSparkConf mockConf = mock(BulkSparkConf.class); + BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); + BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); + + BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, mockBroadcastable, mockSchemaInfo, "4.0.0"); + + TestBulkWriterContext context = new TestBulkWriterContext(config); + + assertThat(context.cluster()).isSameAs(expectedCluster); + } + + @Test + void testReconstructJobInfoOnExecutorCanBeOverridden() + { + JobInfo expectedJobInfo = mock(JobInfo.class); + BulkSparkConf mockConf = mock(BulkSparkConf.class); + BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); + BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); + BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, mock(IBroadcastableClusterInfo.class), mockSchemaInfo, "4.0.0"); + + // Subclass that overrides reconstructJobInfoOnExecutor to return custom JobInfo + TestBulkWriterContext context = new TestBulkWriterContext(config) + { + @Override + protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) + { + return expectedJobInfo; + } + }; + + assertThat(context.job()).isSameAs(expectedJobInfo); + } + + /** + * Minimal AbstractBulkWriterContext subclass for testing executor-side reconstruction + * without requiring real Cassandra infrastructure. + */ + private static class TestBulkWriterContext extends AbstractBulkWriterContext + { + TestBulkWriterContext(@NotNull BulkWriterConfig config) + { + super(config); + } + + @Override + protected ClusterInfo buildClusterInfo() + { + throw new UnsupportedOperationException("Driver-only"); + } + + @Override + protected void validateKeyspaceReplication() + { + } + + @Override + protected MultiClusterContainer generateRestoreJobIds() + { + throw new UnsupportedOperationException("Driver-only"); + } + + @Override + protected CassandraBridge buildCassandraBridge() + { + return mock(CassandraBridge.class); + } + + @Override + protected TransportContext buildTransportContext(boolean isOnDriver) + { + return mock(TransportContext.class); + } + + @Override + protected JobStatsPublisher buildJobStatsPublisher() + { + return mock(JobStatsPublisher.class); + } + + @Override + protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) + { + return mock(JobInfo.class); + } + + @Override + protected SchemaInfo reconstructSchemaInfoOnExecutor(BroadcastableSchemaInfo schemaInfo) + { + return mock(SchemaInfo.class); + } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(org.apache.spark.api.java.JavaSparkContext sparkContext) + { + throw new UnsupportedOperationException("Not needed for test"); + } + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index d9cefac43..cb5564f0c 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -60,6 +60,7 @@ import org.apache.cassandra.spark.exception.SidecarApiCallException; import org.apache.cassandra.spark.exception.TimeSkewTooLargeException; import org.apache.cassandra.spark.validation.StartupValidator; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -551,4 +552,10 @@ public void startupValidate() { StartupValidator.instance().perform(); } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) + { + throw new UnsupportedOperationException("Not implemented in mock"); + } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java index fd87abd59..80867b2dd 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.stream.Collectors; -import com.datastax.driver.core.exceptions.ReadTimeoutException; import org.junit.jupiter.api.Test; import net.bytebuddy.ByteBuddy; @@ -204,7 +203,10 @@ void eachQuorumIsNotQuorum() throws IOException, NoSuchMethodException } catch (Exception e) { - if (attempt == 10 || !(e instanceof ReadTimeoutException)) + // ReadTimeoutException here is of type org.apache.cassandra.exceptions.ReadTimeoutException + // which is not available on the integration-tests compile classpath + // Hence checking for class name instead of using instanceof + if (attempt == 10 || !e.getClass().getName().endsWith("ReadTimeoutException")) { throw e; }