diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0aed28e92558f..99c00cb60caf5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2695,6 +2695,16 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("128MB") // parquet.block.size
+ val ARCHIVE_FORMAT_ENABLED = buildConf("spark.sql.files.archive.enabled")
+ .doc("When true, the CSV data source can read tar archives (.tar, .tar.gz, .tgz): each " +
+ "archive is read as a single split and its entries are streamed through the CSV parser " +
+ "(never unpacked to disk), as if the entries were separate CSV files. Only the CSV data " +
+ "source supports reading archives.")
+ .version("4.3.0")
+ .withBindingPolicy(ConfigBindingPolicy.SESSION)
+ .booleanConf
+ .createWithDefault(false)
+
val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes")
.internal()
.doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 1261200a9173c..e6673c9069f42 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -293,6 +293,10 @@
bcprov-jdk18on
test
+
+ org.apache.commons
+ commons-compress
+
org.bouncycastle
bcpkix-jdk18on
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala
new file mode 100644
index 0000000000000..ce14d45d540f4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{Closeable, FilterInputStream, InputStream}
+import java.util.Locale
+import java.util.zip.GZIPInputStream
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.TaskContext
+
+/**
+ * Streaming reader for tar archives: plain `.tar`, gzipped `.tar.gz`, and `.tgz`.
+ *
+ * The archive is opened once and decompressed/unpacked as a stream -- entries are never
+ * materialized to local disk. [[readEntries]] hands each entry's bytes to a caller-supplied
+ * parse function as a bounded [[InputStream]] and concatenates the per-entry results into a
+ * single iterator, advancing to the next entry only once the current one is fully consumed. At
+ * most one entry is in flight at a time, so memory stays bounded regardless of archive size.
+ *
+ * This is format-agnostic: a `FileFormat` whose per-file reader can consume an `InputStream`
+ * (e.g. CSV via `UnivocityParser`) wires up archive support by calling [[readEntries]] from its
+ * read/inference paths and supplying a `parseEntry` that turns one entry stream into rows (or
+ * tokens). Formats that need random access within a file (Parquet/ORC footers) cannot use this
+ * streaming path.
+ *
+ * Gzip handling: Hadoop's `CompressionCodecFactory` matches the trailing `.gz` extension and
+ * auto-decompresses `.tar.gz` via `CodecStreams`, so we just wrap that stream in
+ * `TarArchiveInputStream`. `.tgz` is not a registered Hadoop codec extension, so the gzip layer
+ * is unwrapped explicitly here.
+ */
+object ArchiveReader {
+
+ def isArchivePath(path: Path): Boolean = {
+ val name = path.getName.toLowerCase(Locale.ROOT)
+ name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz")
+ }
+
+ // Paths Hadoop's codec factory won't auto-decompress: we apply the gzip layer here.
+ private def needsExplicitGunzip(path: Path): Boolean =
+ path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz")
+
+ private def shouldSkipEntry(entry: TarArchiveEntry): Boolean = {
+ if (entry.isDirectory) return true
+ val name = entry.getName
+ val basename = name.substring(name.lastIndexOf('/') + 1)
+ basename.startsWith(".")
+ }
+
+ /** Opens `path` as a tar stream, transparently decompressing `.tar.gz` / `.tgz`. */
+ private def openTarStream(conf: Configuration, path: Path): TarArchiveInputStream = {
+ val base = CodecStreams.createInputStreamWithCloseResource(conf, path)
+ val tarBytes = if (needsExplicitGunzip(path)) new GZIPInputStream(base) else base
+ new TarArchiveInputStream(tarBytes)
+ }
+
+ /**
+ * A view over the shared tar stream that reads exactly the current entry's bytes
+ * (`TarArchiveInputStream.read` returns -1 at the entry boundary) and ignores `close()`, so a
+ * parser closing its input does not close the underlying archive. Any unread remainder of an
+ * entry is skipped by `getNextEntry()` when advancing.
+ */
+ private final class EntryInputStream(tar: TarArchiveInputStream)
+ extends FilterInputStream(tar) {
+ override def close(): Unit = ()
+ }
+
+ /**
+ * Streams `path` entry by entry, applying `parseEntry` to each non-skipped entry's
+ * `(name, stream)` and concatenating the results into a single iterator. Directories and OS
+ * sidecar dotfiles (basename starting with `.`, e.g. macOS `._x` / `.DS_Store`) are skipped.
+ *
+ * The next entry is opened only once the current entry's iterator is exhausted, so nothing is
+ * buffered to disk and at most one entry's bytes are read at a time. The archive stream is
+ * closed when the returned iterator is exhausted, when [[Closeable.close]] is called on it, and
+ * (defensively) on task completion.
+ */
+ def readEntries[T](
+ path: Path,
+ conf: Configuration)(
+ parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = {
+ val tar = openTarStream(conf, path)
+ var closed = false
+
+ def cleanup(): Unit = {
+ if (!closed) {
+ closed = true
+ try tar.close() catch { case NonFatal(_) => }
+ }
+ }
+
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup()))
+
+ new Iterator[T] with Closeable {
+ private var currentIter: Iterator[T] = Iterator.empty
+ private var done = false
+
+ // Move to the next entry whose iterator has elements (releasing each exhausted entry's
+ // reader and skipping any unread bytes), or mark the stream done once entries run out.
+ // Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in
+ // `next` is essential for parsers like `UnivocityParser` that reuse a single mutable row and
+ // look ahead on `hasNext`: probing the current entry right after returning a row would
+ // overwrite that row's contents before the caller has copied it.
+ private def advance(): Unit = {
+ while (!done && !currentIter.hasNext) {
+ currentIter match {
+ case c: Closeable => try c.close() catch { case NonFatal(_) => }
+ case _ =>
+ }
+ var entry = tar.getNextEntry
+ while (entry != null && shouldSkipEntry(entry)) entry = tar.getNextEntry
+ if (entry == null) {
+ done = true
+ cleanup()
+ } else {
+ currentIter = parseEntry(entry.getName, new EntryInputStream(tar))
+ }
+ }
+ }
+
+ // Open the first entry eagerly so construction reflects the archive's first entry.
+ advance()
+
+ override def hasNext: Boolean = {
+ advance()
+ !done && currentIter.hasNext
+ }
+
+ override def next(): T = {
+ if (!hasNext) throw new NoSuchElementException
+ currentIter.next()
+ }
+
+ override def close(): Unit = {
+ done = true
+ currentIter = Iterator.empty
+ cleanup()
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 596edc8beaa34..c0640cbc20711 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.csv
-import java.io.{FileNotFoundException, IOException}
+import java.io.{FileNotFoundException, InputStream, IOException}
import java.nio.charset.{Charset, StandardCharsets}
import scala.util.control.NonFatal
@@ -25,8 +25,10 @@ import scala.util.control.NonFatal
import com.univocity.parsers.csv.CsvParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.util.LineReader
import org.apache.spark.TaskContext
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
@@ -42,6 +44,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -52,14 +55,18 @@ abstract class CSVDataSource extends Serializable {
def isSplitable: Boolean
/**
- * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+ * Parse a [[PartitionedFile]] into [[InternalRow]] instances. When `archiveReadEnabled` is set
+ * and `file` is a tar archive, its entries are streamed through the parser (see [[readArchive]]);
+ * otherwise the file is parsed directly. `getHeaderChecker` / `getParser` build a fresh header
+ * checker / parser -- per entry for archives, once for a regular file.
*/
def readFile(
conf: Configuration,
file: PartitionedFile,
- parser: UnivocityParser,
- headerChecker: CSVHeaderChecker,
- requiredSchema: StructType): Iterator[InternalRow]
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType,
+ archiveReadEnabled: Boolean): Iterator[InternalRow]
/**
* Infers the schema from `inputPaths` files.
@@ -71,8 +78,24 @@ abstract class CSVDataSource extends Serializable {
parsedOptions.singleVariantColumn match {
case Some(columnName) => Some(StructType(Array(StructField(columnName, VariantType))))
case None =>
- if (inputPaths.nonEmpty) {
- Some(infer(sparkSession, inputPaths, parsedOptions))
+ // Tar archives are inferred by streaming their entries (never unpacked to disk); any
+ // non-archive files are inferred normally and the two schemas are merged.
+ val (archives, nonArchives) =
+ if (sparkSession.sessionState.conf.getConf(SQLConf.ARCHIVE_FORMAT_ENABLED)) {
+ inputPaths.partition(f => ArchiveReader.isArchivePath(f.getPath))
+ } else {
+ (Seq.empty[FileStatus], inputPaths)
+ }
+ if (archives.nonEmpty) {
+ val archiveSchema = inferArchives(sparkSession, archives, parsedOptions)
+ if (nonArchives.nonEmpty) {
+ Some(mergeSchemas(
+ infer(sparkSession, nonArchives, parsedOptions), archiveSchema, parsedOptions))
+ } else {
+ Some(archiveSchema)
+ }
+ } else if (nonArchives.nonEmpty) {
+ Some(infer(sparkSession, nonArchives, parsedOptions))
} else {
None
}
@@ -83,6 +106,105 @@ abstract class CSVDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType
+
+ /**
+ * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through the CSV parser without
+ * unpacking it to disk. The whole archive is a single split (see `CSVFileFormat.isSplitable`); a
+ * fresh header checker and parser are built per entry so each entry is parsed exactly like a
+ * standalone CSV file -- its header, if any, validated and dropped independently. The
+ * mode-specific `parseEntry` turns one entry into rows via `parseStream` / `parseIterator`.
+ *
+ * @param conf Hadoop configuration used to open and (when needed) decompress the archive.
+ * @param file the archive file; its bytes are never materialized to disk.
+ * @param getParser builds a fresh [[UnivocityParser]].
+ * @param getHeaderChecker builds a fresh [[CSVHeaderChecker]] for `(isStartOfFile, source)`,
+ * where `source` (`CSV archive entry: !/`) names the
+ * entry in error messages.
+ * @param parseEntry parses one entry's `(parser, headerChecker, stream)` into rows.
+ * @return the concatenated rows of every non-skipped entry, read one entry at a time.
+ */
+ protected def readArchive(
+ conf: Configuration,
+ file: PartitionedFile,
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker)(
+ parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) => Iterator[InternalRow])
+ : Iterator[InternalRow] = {
+ ArchiveReader.readEntries(file.toPath, conf) { (entryName, in) =>
+ val headerChecker =
+ getHeaderChecker(true, s"CSV archive entry: ${file.urlEncodedPath}!/$entryName")
+ val parser = getParser()
+ parseEntry(parser, headerChecker, in)
+ }
+ }
+
+ /**
+ * Infers a CSV schema from tar archives (`.tar`/`.tar.gz`/`.tgz`) by streaming their entries --
+ * the archive is never unpacked to disk. Each entry is tokenized like a standalone CSV file (its
+ * header row dropped when `header` is set), and all entries' rows feed a single
+ * [[CSVInferSchema]] pass keyed on the first entry's header, so the result matches reading the
+ * entries as separate files. Mirrors [[MultiLineCSVDataSource.infer]] but iterates tar entries.
+ */
+ private def inferArchives(
+ sparkSession: SparkSession,
+ archives: Seq[FileStatus],
+ parsedOptions: CSVOptions): StructType = {
+ val archiveRdd = createArchiveBaseRdd(sparkSession, archives, parsedOptions)
+ def tokens(dropHeader: Boolean): RDD[Array[String]] = archiveRdd.flatMap { stream =>
+ val path = new Path(stream.getPath())
+ ArchiveReader.readEntries(path, stream.getConfiguration) { (_, in) =>
+ UnivocityParser.tokenizeStream(
+ in, dropHeader, new CsvParser(parsedOptions.asParserSettings), parsedOptions.charset)
+ }
+ }
+ tokens(dropHeader = false).take(1).headOption match {
+ case Some(firstRow) =>
+ val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+ val sampled = CSVUtils.sample(tokens(dropHeader = parsedOptions.headerFlag), parsedOptions)
+ SQLExecution.withSQLConfPropagated(sparkSession) {
+ new CSVInferSchema(parsedOptions).infer(sampled, header)
+ }
+ case None =>
+ StructType(Nil)
+ }
+ }
+
+ /** One `PortableDataStream` per archive file, like `MultiLineCSVDataSource.createBaseRdd`. */
+ private def createArchiveBaseRdd(
+ sparkSession: SparkSession,
+ inputPaths: Seq[FileStatus],
+ options: CSVOptions): RDD[PortableDataStream] = {
+ val paths = inputPaths.map(_.getPath)
+ val name = paths.mkString(",")
+ val job = Job.getInstance(
+ sparkSession.sessionState.newHadoopConfWithOptions(options.parameters))
+ FileInputFormat.setInputPaths(job, paths: _*)
+ val rdd = new BinaryFileRDD(
+ sparkSession.sparkContext,
+ classOf[StreamInputFormat],
+ classOf[String],
+ classOf[PortableDataStream],
+ job.getConfiguration,
+ sparkSession.sparkContext.defaultMinPartitions)
+ rdd.setName(s"CSVArchiveFile: $name").values
+ }
+
+ /**
+ * Positionally merges two already-inferred CSV schemas (the non-archive vs archive results),
+ * widening per-column types via [[CSVInferSchema.mergeRowTypes]] and keeping the longer header --
+ * the same first-header, positional model used for a multi-file CSV read.
+ */
+ private def mergeSchemas(
+ left: StructType,
+ right: StructType,
+ parsedOptions: CSVOptions): StructType = {
+ val inferSchema = new CSVInferSchema(parsedOptions)
+ val mergedTypes =
+ inferSchema.mergeRowTypes(left.map(_.dataType).toArray, right.map(_.dataType).toArray)
+ val header = (if (left.length >= right.length) left else right).map(_.name).toArray
+ StructType(inferSchema.toStructFields(mergedTypes, header))
+ }
}
object CSVDataSource extends Logging {
@@ -126,9 +248,20 @@ object TextInputCSVDataSource extends CSVDataSource {
override def readFile(
conf: Configuration,
file: PartitionedFile,
- parser: UnivocityParser,
- headerChecker: CSVHeaderChecker,
- requiredSchema: StructType): Iterator[InternalRow] = {
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType,
+ archiveReadEnabled: Boolean): Iterator[InternalRow] = {
+ if (archiveReadEnabled && ArchiveReader.isArchivePath(file.toPath)) {
+ // Stream each tar entry through the line-based parser, treating the entry exactly like a
+ // standalone CSV file (a fresh parser/header checker is built per entry).
+ return readArchive(conf, file, getParser, getHeaderChecker) { (parser, headerChecker, in) =>
+ UnivocityParser.parseIterator(
+ archiveLines(in, parser.options), parser, headerChecker, requiredSchema)
+ }
+ }
+ val parser = getParser()
+ val headerChecker = getHeaderChecker(file.start == 0, s"CSV file: ${file.urlEncodedPath}")
val lines = {
val linesReader = Utils.createResourceUninterruptiblyIfInTaskThread(
new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
@@ -144,6 +277,46 @@ object TextInputCSVDataSource extends CSVDataSource {
UnivocityParser.parseIterator(lines, parser, headerChecker, requiredSchema)
}
+ /**
+ * Splits a single archive entry's bytes into CSV lines, mirroring the line semantics of
+ * `HadoopFileLinesReader` used by the non-archive [[readFile]] path above: lines are split with
+ * Hadoop's `LineReader` (honoring a custom line separator), decoded with the configured charset,
+ * and the separator is re-appended so `UnivocityParser` does not raise EOF on the final line.
+ *
+ * @param in bytes of one already-decompressed archive entry; not closed here (the archive owns
+ * the underlying stream).
+ * @param options CSV options supplying the read line separator and charset.
+ * @return an iterator over the entry's lines, each terminated with the line separator.
+ */
+ private def archiveLines(in: InputStream, options: CSVOptions): Iterator[String] = {
+ val lineReader = options.lineSeparatorInRead match {
+ case Some(sep) => new LineReader(in, sep)
+ case _ => new LineReader(in)
+ }
+ val newline = options.lineSeparatorInRead.getOrElse(
+ Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte))
+ new Iterator[String] {
+ private val text = new Text()
+ private var finished = false
+ private var hasValue = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !hasValue) {
+ finished = lineReader.readLine(text) == 0
+ hasValue = !finished
+ }
+ !finished
+ }
+
+ override def next(): String = {
+ if (!hasNext) throw new NoSuchElementException
+ hasValue = false
+ text.append(newline, 0, newline.length)
+ new String(text.getBytes, 0, text.getLength, options.charset)
+ }
+ }
+ }
+
override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
@@ -215,9 +388,19 @@ object MultiLineCSVDataSource extends CSVDataSource with Logging {
override def readFile(
conf: Configuration,
file: PartitionedFile,
- parser: UnivocityParser,
- headerChecker: CSVHeaderChecker,
- requiredSchema: StructType): Iterator[InternalRow] = {
+ getParser: () => UnivocityParser,
+ getHeaderChecker: (Boolean, String) => CSVHeaderChecker,
+ requiredSchema: StructType,
+ archiveReadEnabled: Boolean): Iterator[InternalRow] = {
+ if (archiveReadEnabled && ArchiveReader.isArchivePath(file.toPath)) {
+ // Stream each tar entry whole through the multi-line parser (a fresh parser/header checker
+ // is built per entry).
+ return readArchive(conf, file, getParser, getHeaderChecker) { (parser, headerChecker, in) =>
+ UnivocityParser.parseStream(in, parser, headerChecker, requiredSchema)
+ }
+ }
+ val parser = getParser()
+ val headerChecker = getHeaderChecker(file.start == 0, s"CSV file: ${file.urlEncodedPath}")
headerChecker.setHeaderForSingleVariantColumn =
CSVDataSource.setHeaderForSingleVariantColumn(conf, file, parser)
UnivocityParser.parseStream(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 77a0c53ae4699..b7e04c3d9af03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -43,6 +44,12 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
+ // A tar archive is decompressed/unpacked as a sequential stream, so it must be read as a
+ // single split rather than carved into byte ranges.
+ if (getSqlConf(sparkSession).getConf(SQLConf.ARCHIVE_FORMAT_ENABLED) &&
+ ArchiveReader.isArchivePath(path)) {
+ return false
+ }
val parsedOptions = getCsvOptions(sparkSession, options)
CSVDataSource(parsedOptions).isSplitable && super.isSplitable(sparkSession, options, path)
}
@@ -99,6 +106,7 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister {
SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf)
val parsedOptions = getCsvOptions(sparkSession, options)
val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled(requiredSchema)
+ val archiveReadEnabled = getSqlConf(sparkSession).getConf(SQLConf.ARCHIVE_FORMAT_ENABLED)
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
@@ -119,24 +127,20 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister {
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val parser = new UnivocityParser(
- actualDataSchema,
- actualRequiredSchema,
- parsedOptions,
- actualFilters)
// Use column pruning when specified by Catalyst, except when one or more columns have
// existence default value(s), since in that case we instruct the CSV parser to disable column
// pruning and instead read each entire row in order to correctly assign the default value(s).
val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema
- val isStartOfFile = file.start == 0
- val headerChecker = new CSVHeaderChecker(
- schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile)
+
+ def newParser(): UnivocityParser =
+ new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters)
+ def getHeaderChecker(isStartOfFile: Boolean, source: String): CSVHeaderChecker =
+ new CSVHeaderChecker(schema, parsedOptions, source, isStartOfFile)
+
+ // CSVDataSource.readFile parses the file directly, or streams it entry by entry when archive
+ // reads are enabled and `file` is a tar archive (a single split, see `isSplitable`).
CSVDataSource(parsedOptions).readFile(
- conf,
- file,
- parser,
- headerChecker,
- requiredSchema)
+ conf, file, () => newParser(), getHeaderChecker, requiredSchema, archiveReadEnabled)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index 65eff0647ee2b..f6bfc0eb1ed74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -53,25 +53,24 @@ case class CSVPartitionReaderFactory(
dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
val actualReadDataSchema = StructType(
readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
- val parser = new UnivocityParser(
- actualDataSchema,
- actualReadDataSchema,
- options,
- filters)
val schema = if (options.isColumnPruningEnabled(readDataSchema)) {
actualReadDataSchema
} else {
actualDataSchema
}
- val isStartOfFile = file.start == 0
- val headerChecker = new CSVHeaderChecker(
- schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile)
+ // The V2 CSV reader does not read tar archives, so a single parser and header checker are
+ // built per file.
+ val getParser: () => UnivocityParser = () =>
+ new UnivocityParser(actualDataSchema, actualReadDataSchema, options, filters)
+ val getHeaderChecker: (Boolean, String) => CSVHeaderChecker = (isStartOfFile, source) =>
+ new CSVHeaderChecker(schema, options, source, isStartOfFile)
val iter = CSVDataSource(options).readFile(
conf,
file,
- parser,
- headerChecker,
- readDataSchema)
+ getParser,
+ getHeaderChecker,
+ readDataSchema,
+ archiveReadEnabled = false)
val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
partitionSchema, file.partitionValues)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala
new file mode 100644
index 0000000000000..a6671535a1fe0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{File, FileOutputStream, OutputStream}
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util.zip.GZIPOutputStream
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+/**
+ * End-to-end reads of `.tar` / `.tar.gz` / `.tgz` archives of CSV files through the streaming
+ * [[ArchiveReader]] path wired into `CSVFileFormat`. Entries are streamed (never unpacked to
+ * disk), and the central contract verified throughout is parity with reading the same files from
+ * a directory.
+ */
+class ArchiveReadSuite extends QueryTest with SharedSparkSession {
+
+ override def sparkConf: SparkConf =
+ super.sparkConf.set(SQLConf.ARCHIVE_FORMAT_ENABLED.key, "true")
+
+ import testImplicits._
+
+ // ----- helpers ------------------------------------------------------------
+
+ /**
+ * Provide an archive-extensioned file path inside a fresh temp dir. The extension is
+ * load-bearing: [[ArchiveReader.isArchivePath]] dispatches solely on the suffix.
+ */
+ private def withArchiveFile(extension: String = "tar")(f: File => Unit): Unit = {
+ val dir = Utils.createTempDir(namePrefix = "archive-test")
+ try f(new File(dir, s"archive.$extension")) finally Utils.deleteRecursively(dir)
+ }
+
+ /** Backwards-compatible shorthand for the plain-`.tar` case. */
+ private def withTarFile(f: File => Unit): Unit = withArchiveFile("tar")(f)
+
+ private def writeTar(dest: File, entries: Seq[(String, Array[Byte])]): Unit =
+ writeTarTo(new FileOutputStream(dest), entries)
+
+ private def writeTarGz(dest: File, entries: Seq[(String, Array[Byte])]): Unit =
+ writeTarTo(new GZIPOutputStream(new FileOutputStream(dest)), entries)
+
+ private def writeTarTo(rawOut: OutputStream, entries: Seq[(String, Array[Byte])]): Unit = {
+ val out = new TarArchiveOutputStream(rawOut)
+ try {
+ entries.foreach { case (name, bytes) =>
+ val e = new TarArchiveEntry(name)
+ e.setSize(bytes.length.toLong)
+ out.putArchiveEntry(e)
+ out.write(bytes)
+ out.closeArchiveEntry()
+ }
+ out.finish()
+ } finally out.close()
+ }
+
+ private def bytes(s: String): Array[Byte] = s.getBytes(StandardCharsets.UTF_8)
+
+ /**
+ * Write `df` as a single CSV file to a temp dir, then return the bytes of the single produced
+ * data file. Used to build "real" CSV payloads for tarring.
+ */
+ private def encodeAsSingleCsv(
+ df: DataFrame,
+ writerOpts: Map[String, String] = Map.empty): Array[Byte] = {
+ val d = Utils.createTempDir(namePrefix = "archive-test-encode")
+ try {
+ var writer = df.coalesce(1).write.format("csv").mode("overwrite")
+ writerOpts.foreach { case (k, v) => writer = writer.option(k, v) }
+ writer.save(d.getCanonicalPath)
+ val parts = d.listFiles().filter { f =>
+ f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") &&
+ !f.getName.endsWith(".crc")
+ }
+ assert(parts.length == 1,
+ s"expected exactly one data file, got: ${parts.map(_.getName).toList}")
+ Files.readAllBytes(parts.head.toPath)
+ } finally Utils.deleteRecursively(d)
+ }
+
+ private val csvWriteOpts = Map("header" -> "true")
+
+ /** Reads `entries` from a tar and asserts the rows match reading them from a directory. */
+ private def assertTarMatchesDir(
+ entries: Seq[(String, Array[Byte])],
+ readerOpts: Map[String, String],
+ schema: String): Unit = {
+ withTarFile { tarPath =>
+ writeTar(tarPath, entries)
+ val readTar = readerOpts.foldLeft(spark.read) { case (r, (k, v)) => r.option(k, v) }
+ .schema(schema).csv(tarPath.getCanonicalPath)
+ withTempDir { dir =>
+ entries.foreach { case (name, b) => Files.write(new File(dir, name).toPath, b) }
+ val readDir = readerOpts.foldLeft(spark.read) { case (r, (k, v)) => r.option(k, v) }
+ .schema(schema).csv(dir.getCanonicalPath)
+ checkAnswer(readTar, readDir.collect().toSeq)
+ }
+ }
+ }
+
+ // ----- round-trips --------------------------------------------------------
+
+ test("CSV: read tar of multiple CSV entries with header") {
+ withTarFile { tarPath =>
+ val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
+ val df2 = Seq((3, "Carol")).toDF("id", "name")
+ val df3 = Seq((4, "Dan"), (5, "Eve")).toDF("id", "name")
+
+ val parts = Seq(df1, df2, df3).map(g => encodeAsSingleCsv(g, csvWriteOpts))
+ writeTar(tarPath, parts.zipWithIndex.map { case (b, i) => (s"part-$i.csv", b) })
+
+ val read = spark.read.option("header", "true")
+ .schema("id INT, name STRING").csv(tarPath.getCanonicalPath)
+ checkAnswer(read, df1.union(df2).union(df3))
+ }
+ }
+
+ test("CSV: read gzipped tar (.tar.gz): Hadoop codec auto-decompresses") {
+ withArchiveFile("tar.gz") { tarPath =>
+ val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
+ val df2 = Seq((3, "Carol")).toDF("id", "name")
+ val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts))
+ writeTarGz(tarPath, parts.zipWithIndex.map { case (b, i) => (s"part-$i.csv", b) })
+
+ val read = spark.read.option("header", "true")
+ .schema("id INT, name STRING").csv(tarPath.getCanonicalPath)
+ checkAnswer(read, df1.union(df2))
+ }
+ }
+
+ test("CSV: read gzipped tar (.tgz): explicit gunzip path") {
+ withArchiveFile("tgz") { tarPath =>
+ val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
+ val df2 = Seq((3, "Carol")).toDF("id", "name")
+ val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts))
+ writeTarGz(tarPath, parts.zipWithIndex.map { case (b, i) => (s"part-$i.csv", b) })
+
+ val read = spark.read.option("header", "true")
+ .schema("id INT, name STRING").csv(tarPath.getCanonicalPath)
+ checkAnswer(read, df1.union(df2))
+ }
+ }
+
+ // ----- parity with directory reads ----------------------------------------
+
+ test("CSV: tar entries with mismatched headers behave like standalone files") {
+ assertTarMatchesDir(
+ Seq(
+ "a.csv" -> bytes("id,name\n1,Alice\n2,Bob\n"),
+ "b.csv" -> bytes("id,nickname\n3,Carol\n")),
+ Map("header" -> "true"),
+ "id INT, name STRING")
+ }
+
+ test("CSV: headerless tar matches a directory read") {
+ assertTarMatchesDir(
+ Seq("a.csv" -> bytes("1,Alice\n2,Bob\n"), "b.csv" -> bytes("3,Carol\n")),
+ Map.empty,
+ "id INT, name STRING")
+ }
+
+ test("CSV: custom delimiter matches a directory read") {
+ assertTarMatchesDir(
+ Seq("a.csv" -> bytes("id;name\n1;Alice\n2;Bob\n")),
+ Map("header" -> "true", "delimiter" -> ";"),
+ "id INT, name STRING")
+ }
+
+ test("CSV: multiline quoted fields with embedded newlines match a directory read") {
+ assertTarMatchesDir(
+ Seq(
+ "a.csv" -> bytes("id,note\n1,\"line1\nline2\"\n2,\"plain\"\n"),
+ "b.csv" -> bytes("id,note\n3,\"a\nb\nc\"\n")),
+ Map("header" -> "true", "multiLine" -> "true"),
+ "id INT, note STRING")
+ }
+
+ test("CSV: column pruning selects a subset of columns") {
+ withTarFile { tarPath =>
+ val df = Seq((1, "Alice", 10), (2, "Bob", 20)).toDF("id", "name", "age")
+ writeTar(tarPath, Seq("p.csv" -> encodeAsSingleCsv(df, csvWriteOpts)))
+
+ val read = spark.read.option("header", "true")
+ .schema("id INT, name STRING, age INT").csv(tarPath.getCanonicalPath)
+ .select("name")
+ checkAnswer(read, Seq(Row("Alice"), Row("Bob")))
+ }
+ }
+
+ test("Tar combined with non-archive sibling under a partitioned dir layout") {
+ withTempDir { rootDir =>
+ val partitionDir = new File(rootDir, "dt=2024-01-01")
+ assert(partitionDir.mkdirs())
+
+ val df1 = Seq((1, "in-tar")).toDF("id", "v")
+ val df2 = Seq((2, "loose")).toDF("id", "v")
+
+ writeTar(
+ new File(partitionDir, "in-tar.tar"), Seq("p.csv" -> encodeAsSingleCsv(df1, csvWriteOpts)))
+ Files.write(new File(partitionDir, "loose.csv").toPath, encodeAsSingleCsv(df2, csvWriteOpts))
+
+ val read = spark.read.option("header", "true")
+ .schema("id INT, v STRING").csv(rootDir.getCanonicalPath)
+ val rows = read.select("id", "v", "dt").collect().map(_.toSeq.map(_.toString).toList).toSet
+ assert(rows == Set(
+ List("1", "in-tar", "2024-01-01"),
+ List("2", "loose", "2024-01-01")),
+ s"unexpected rows: $rows")
+ }
+ }
+
+ test("Splittability: a tar always yields a single partition regardless of size") {
+ withTarFile { tarPath =>
+ val df = (1 to 1000).map(i => (i, s"value-$i")).toDF("id", "v")
+ val parts = (1 to 4).map(_ => encodeAsSingleCsv(df, csvWriteOpts))
+ writeTar(tarPath, parts.zipWithIndex.map { case (b, i) => (s"e-$i.csv", b) })
+
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1024") {
+ val readDf = spark.read.option("header", "true")
+ .schema("id INT, v STRING").csv(tarPath.getCanonicalPath)
+ assert(readDf.rdd.getNumPartitions == 1,
+ s"tar should be a single partition; got ${readDf.rdd.getNumPartitions}")
+ assert(readDf.count() == 4L * df.count())
+ }
+ }
+ }
+
+ // ----- schema inference ---------------------------------------------------
+
+ test("Inference parity: tar infers the same schema as a directory of CSVs") {
+ val df1 = Seq((1, "a"), (2, "b")).toDF("id", "v")
+ val df2 = Seq((3, "c")).toDF("id", "v")
+ val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts))
+
+ val tarHolder = Utils.createTempDir(namePrefix = "archive-test-tar")
+ val dirHolder = Utils.createTempDir(namePrefix = "archive-test-dir")
+ try {
+ val tarFile = new File(tarHolder, "infer.tar")
+ writeTar(tarFile, parts.zipWithIndex.map { case (b, i) => (s"e-$i.csv", b) })
+ val tarSchema = spark.read.option("header", "true").option("inferSchema", "true")
+ .csv(tarFile.getCanonicalPath).schema
+
+ parts.zipWithIndex.foreach { case (b, i) =>
+ Files.write(new File(dirHolder, s"e-$i.csv").toPath, b)
+ }
+ val dirSchema = spark.read.option("header", "true").option("inferSchema", "true")
+ .csv(dirHolder.getCanonicalPath).schema
+
+ assert(tarSchema == dirSchema,
+ s"schema inference parity broken; tar=$tarSchema dir=$dirSchema")
+ val expected = StructType(Seq(
+ StructField("id", IntegerType, nullable = true),
+ StructField("v", StringType, nullable = true)))
+ assert(tarSchema == expected, s"unexpected inferred schema: $tarSchema")
+ } finally {
+ Utils.deleteRecursively(tarHolder)
+ Utils.deleteRecursively(dirHolder)
+ }
+ }
+
+ test("Inference: .tar.gz infers the same schema as the equivalent .tar") {
+ val df1 = Seq((1, "a"), (2, "b")).toDF("id", "v")
+ val df2 = Seq((3, "c")).toDF("id", "v")
+ val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts))
+ val entries = parts.zipWithIndex.map { case (b, i) => (s"e-$i.csv", b) }
+
+ val plainHolder = Utils.createTempDir(namePrefix = "archive-test-tar")
+ val gzHolder = Utils.createTempDir(namePrefix = "archive-test-targz")
+ try {
+ val plainTar = new File(plainHolder, "infer.tar")
+ writeTar(plainTar, entries)
+ val gzTar = new File(gzHolder, "infer.tar.gz")
+ writeTarGz(gzTar, entries)
+
+ val plainSchema = spark.read.option("header", "true").option("inferSchema", "true")
+ .csv(plainTar.getCanonicalPath).schema
+ val gzSchema = spark.read.option("header", "true").option("inferSchema", "true")
+ .csv(gzTar.getCanonicalPath).schema
+ assert(plainSchema == gzSchema,
+ s"schema inference parity broken; tar=$plainSchema tar.gz=$gzSchema")
+ } finally {
+ Utils.deleteRecursively(plainHolder)
+ Utils.deleteRecursively(gzHolder)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala
new file mode 100644
index 0000000000000..551e98b510abb
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{ByteArrayOutputStream, Closeable, File, FileOutputStream, InputStream, OutputStream}
+import java.nio.charset.StandardCharsets
+import java.util.Properties
+import java.util.zip.GZIPOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.{SparkFunSuite, TaskContext, TaskContextImpl}
+
+/**
+ * Unit tests for the streaming [[ArchiveReader]] core: `isArchivePath` dispatch and `readEntries`
+ * (entry ordering, gzip handling, dir/dotfile skipping, lazy advance, the non-closing entry
+ * stream, and cleanup). Nothing here touches local disk -- entries are consumed as streams.
+ */
+class ArchiveReaderSuite extends SparkFunSuite {
+
+ private case class Entry(name: String, data: Array[Byte], isDir: Boolean = false)
+
+ private def writeTar(file: File, entries: Seq[Entry]): Unit =
+ writeTarTo(new FileOutputStream(file), entries)
+
+ /** Write a gzipped tar, used to verify the `.tar.gz` / `.tgz` archive paths. */
+ private def writeTarGz(file: File, entries: Seq[Entry]): Unit =
+ writeTarTo(new GZIPOutputStream(new FileOutputStream(file)), entries)
+
+ private def writeTarTo(rawOut: OutputStream, entries: Seq[Entry]): Unit = {
+ val out = new TarArchiveOutputStream(rawOut)
+ try {
+ entries.foreach { e =>
+ // TarArchiveEntry treats a trailing slash in the name as a directory marker.
+ val rawName = if (e.isDir && !e.name.endsWith("/")) e.name + "/" else e.name
+ val tarEntry = new TarArchiveEntry(rawName)
+ if (!e.isDir) tarEntry.setSize(e.data.length.toLong)
+ out.putArchiveEntry(tarEntry)
+ if (!e.isDir) out.write(e.data)
+ out.closeArchiveEntry()
+ }
+ out.finish()
+ } finally out.close()
+ }
+
+ private def textEntry(name: String, body: String): Entry =
+ Entry(name, body.getBytes(StandardCharsets.UTF_8))
+
+ private def readAll(in: InputStream): Array[Byte] = {
+ val out = new ByteArrayOutputStream()
+ val buf = new Array[Byte](4096)
+ var n = in.read(buf)
+ while (n >= 0) {
+ out.write(buf, 0, n)
+ n = in.read(buf)
+ }
+ out.toByteArray
+ }
+
+ /** Drains every entry into `(name, decodedText)` pairs through `ArchiveReader.readEntries`. */
+ private def collect(file: File): Seq[(String, String)] =
+ ArchiveReader.readEntries(new Path(file.toURI), new Configuration()) { (name, in) =>
+ Iterator.single((name, new String(readAll(in), StandardCharsets.UTF_8)))
+ }.toList
+
+ // ----- isArchivePath ------------------------------------------------------
+
+ test("isArchivePath: positive cases") {
+ Seq(
+ "foo.tar", "FOO.TAR", "/a/b/c/x.tar", "weird.TaR",
+ "foo.tar.gz", "FOO.TAR.GZ", "mixed.Tar.Gz", "/a/b/c/x.tar.gz",
+ "foo.tgz", "FOO.TGZ", "/a/b/c/x.tgz"
+ ).foreach { p =>
+ assert(ArchiveReader.isArchivePath(new Path(p)), s"expected archive match for $p")
+ }
+ }
+
+ test("isArchivePath: negative cases") {
+ Seq("foo.csv", "foo.gz", "foo", "dir/", "foo.tarball", "data.zip",
+ "foo.tar.bz2", "foo.targz").foreach { p =>
+ assert(!ArchiveReader.isArchivePath(new Path(p)), s"expected non-match for $p")
+ }
+ }
+
+ // ----- readEntries --------------------------------------------------------
+
+ test("readEntries: empty tar yields empty iterator") {
+ withTempDir { dir =>
+ val tar = new File(dir, "empty.tar")
+ writeTar(tar, Seq.empty)
+ assert(collect(tar).isEmpty)
+ }
+ }
+
+ test("readEntries: single entry exposes its name and bytes") {
+ withTempDir { dir =>
+ val tar = new File(dir, "single.tar")
+ writeTar(tar, Seq(textEntry("only.csv", "hello\n")))
+ assert(collect(tar) == Seq("only.csv" -> "hello\n"))
+ }
+ }
+
+ test("readEntries: multiple entries chained in tar order") {
+ withTempDir { dir =>
+ val tar = new File(dir, "multi.tar")
+ writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c")))
+ assert(collect(tar) == Seq("a.csv" -> "a", "b.csv" -> "b", "c.csv" -> "c"))
+ }
+ }
+
+ test("readEntries: gzipped tar (.tar.gz) via Hadoop codec factory") {
+ withTempDir { dir =>
+ val tarGz = new File(dir, "data.tar.gz")
+ writeTarGz(tarGz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+ assert(collect(tarGz) == Seq("a.csv" -> "a", "b.csv" -> "b"))
+ }
+ }
+
+ test("readEntries: gzipped tar (.tgz) via explicit GZIPInputStream wrap") {
+ withTempDir { dir =>
+ val tgz = new File(dir, "data.tgz")
+ writeTarGz(tgz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+ assert(collect(tgz) == Seq("a.csv" -> "a", "b.csv" -> "b"))
+ }
+ }
+
+ test("readEntries: directory entries are skipped") {
+ withTempDir { dir =>
+ val tar = new File(dir, "dirs.tar")
+ writeTar(tar, Seq(
+ Entry("subdir", Array.emptyByteArray, isDir = true),
+ textEntry("subdir/data.csv", "x")))
+ assert(collect(tar) == Seq("subdir/data.csv" -> "x"))
+ }
+ }
+
+ test("readEntries: dotfile entries (e.g. macOS ._foo) are skipped") {
+ withTempDir { dir =>
+ val tar = new File(dir, "dots.tar")
+ writeTar(tar, Seq(
+ textEntry("._real.csv", "junk"), // macOS AppleDouble sidecar
+ textEntry(".hidden", "ignored"), // bare dotfile
+ textEntry("real.csv", "kept"),
+ textEntry("nested/._sidecar", "junk2"))) // dotfile in a subdir
+ assert(collect(tar) == Seq("real.csv" -> "kept"))
+ }
+ }
+
+ test("readEntries: advances lazily, one entry at a time") {
+ withTempDir { dir =>
+ val tar = new File(dir, "lazy.tar")
+ writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c")))
+
+ val opened = ArrayBuffer[String]()
+ // parseEntry yields a single element without reading the stream, so each invocation maps to
+ // exactly one consumed output element -- letting us observe when the next entry is opened.
+ val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, _) =>
+ opened += name
+ Iterator.single(name)
+ }
+
+ // Construction opens only the first entry; later entries open on demand as iteration
+ // crosses each entry boundary (never all upfront).
+ assert(opened.toList == List("a.csv"))
+ assert(it.hasNext)
+ assert(it.next() == "a.csv")
+ // Entry 0 is still in flight until its element is consumed, so entry 1 stays unopened.
+ assert(opened.toList == List("a.csv"))
+ assert(it.next() == "b.csv")
+ assert(opened.toList == List("a.csv", "b.csv"))
+ assert(it.next() == "c.csv")
+ assert(opened.toList == List("a.csv", "b.csv", "c.csv"))
+ assert(!it.hasNext)
+ assert(opened.size == 3)
+ }
+ }
+
+ test("readEntries: a parseEntry that closes its stream still advances to the next entry") {
+ withTempDir { dir =>
+ val tar = new File(dir, "close.tar")
+ writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+
+ val seen = ArrayBuffer[String]()
+ val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, in) =>
+ val body = new String(readAll(in), StandardCharsets.UTF_8)
+ in.close() // must NOT close the underlying archive
+ seen += body
+ Iterator.single(name)
+ }
+ assert(it.toList == List("a.csv", "b.csv"))
+ assert(seen.toList == List("a", "b"))
+ }
+ }
+
+ test("readEntries: close() is safe, idempotent, and stops iteration") {
+ withTempDir { dir =>
+ val tar = new File(dir, "closeable.tar")
+ writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+
+ val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, _) =>
+ Iterator.single(name)
+ }
+ assert(it.hasNext)
+ it.asInstanceOf[Closeable].close()
+ it.asInstanceOf[Closeable].close() // idempotent
+ assert(!it.hasNext)
+ }
+ }
+
+ test("readEntries: TaskContext completion cleans up without error") {
+ withTempDir { dir =>
+ val tar = new File(dir, "ctx.tar")
+ writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b")))
+
+ val ctx = new TaskContextImpl(
+ stageId = 0,
+ stageAttemptNumber = 0,
+ partitionId = 0,
+ taskAttemptId = 1L,
+ attemptNumber = 0,
+ numPartitions = 0,
+ taskMemoryManager = null,
+ localProperties = new Properties,
+ metricsSystem = null)
+ TaskContext.setTaskContext(ctx)
+ try {
+ val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, _) =>
+ Iterator.single(name)
+ }
+ assert(it.hasNext)
+ it.next() // open the archive and register the completion listener
+ // Simulate task completion without exhausting/closing the iterator.
+ ctx.markTaskCompleted(None)
+ } finally {
+ TaskContext.unset()
+ }
+ }
+ }
+}