From 670e233c151e43fb16d65762b25d65f03c050235 Mon Sep 17 00:00:00 2001 From: Akshat Shenoi Date: Fri, 29 May 2026 18:47:25 +0000 Subject: [PATCH 1/2] [SPARK-57135][SQL] Support reading CSV files inside tar archives --- .../apache/spark/sql/internal/SQLConf.scala | 10 + sql/core/pom.xml | 4 + .../execution/datasources/ArchiveReader.scala | 161 +++++++++ .../datasources/csv/CSVDataSource.scala | 89 ++++- .../datasources/csv/CSVFileFormat.scala | 96 +++++- .../datasources/ArchiveReadSuite.scala | 310 ++++++++++++++++++ .../datasources/ArchiveReaderSuite.scala | 258 +++++++++++++++ 7 files changed, 912 insertions(+), 16 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0aed28e92558f..99c00cb60caf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2695,6 +2695,16 @@ object SQLConf { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("128MB") // parquet.block.size + val ARCHIVE_FORMAT_ENABLED = buildConf("spark.sql.files.archive.enabled") + .doc("When true, the CSV data source can read tar archives (.tar, .tar.gz, .tgz): each " + + "archive is read as a single split and its entries are streamed through the CSV parser " + + "(never unpacked to disk), as if the entries were separate CSV files. Only the CSV data " + + "source supports reading archives.") + .version("4.3.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes") .internal() .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" + diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 1261200a9173c..e6673c9069f42 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -293,6 +293,10 @@ bcprov-jdk18on test + + org.apache.commons + commons-compress + org.bouncycastle bcpkix-jdk18on diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala new file mode 100644 index 0000000000000..ce14d45d540f4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{Closeable, FilterInputStream, InputStream} +import java.util.Locale +import java.util.zip.GZIPInputStream + +import scala.util.control.NonFatal + +import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.TaskContext + +/** + * Streaming reader for tar archives: plain `.tar`, gzipped `.tar.gz`, and `.tgz`. + * + * The archive is opened once and decompressed/unpacked as a stream -- entries are never + * materialized to local disk. [[readEntries]] hands each entry's bytes to a caller-supplied + * parse function as a bounded [[InputStream]] and concatenates the per-entry results into a + * single iterator, advancing to the next entry only once the current one is fully consumed. At + * most one entry is in flight at a time, so memory stays bounded regardless of archive size. + * + * This is format-agnostic: a `FileFormat` whose per-file reader can consume an `InputStream` + * (e.g. CSV via `UnivocityParser`) wires up archive support by calling [[readEntries]] from its + * read/inference paths and supplying a `parseEntry` that turns one entry stream into rows (or + * tokens). Formats that need random access within a file (Parquet/ORC footers) cannot use this + * streaming path. + * + * Gzip handling: Hadoop's `CompressionCodecFactory` matches the trailing `.gz` extension and + * auto-decompresses `.tar.gz` via `CodecStreams`, so we just wrap that stream in + * `TarArchiveInputStream`. `.tgz` is not a registered Hadoop codec extension, so the gzip layer + * is unwrapped explicitly here. + */ +object ArchiveReader { + + def isArchivePath(path: Path): Boolean = { + val name = path.getName.toLowerCase(Locale.ROOT) + name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") + } + + // Paths Hadoop's codec factory won't auto-decompress: we apply the gzip layer here. + private def needsExplicitGunzip(path: Path): Boolean = + path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz") + + private def shouldSkipEntry(entry: TarArchiveEntry): Boolean = { + if (entry.isDirectory) return true + val name = entry.getName + val basename = name.substring(name.lastIndexOf('/') + 1) + basename.startsWith(".") + } + + /** Opens `path` as a tar stream, transparently decompressing `.tar.gz` / `.tgz`. */ + private def openTarStream(conf: Configuration, path: Path): TarArchiveInputStream = { + val base = CodecStreams.createInputStreamWithCloseResource(conf, path) + val tarBytes = if (needsExplicitGunzip(path)) new GZIPInputStream(base) else base + new TarArchiveInputStream(tarBytes) + } + + /** + * A view over the shared tar stream that reads exactly the current entry's bytes + * (`TarArchiveInputStream.read` returns -1 at the entry boundary) and ignores `close()`, so a + * parser closing its input does not close the underlying archive. Any unread remainder of an + * entry is skipped by `getNextEntry()` when advancing. + */ + private final class EntryInputStream(tar: TarArchiveInputStream) + extends FilterInputStream(tar) { + override def close(): Unit = () + } + + /** + * Streams `path` entry by entry, applying `parseEntry` to each non-skipped entry's + * `(name, stream)` and concatenating the results into a single iterator. Directories and OS + * sidecar dotfiles (basename starting with `.`, e.g. macOS `._x` / `.DS_Store`) are skipped. + * + * The next entry is opened only once the current entry's iterator is exhausted, so nothing is + * buffered to disk and at most one entry's bytes are read at a time. The archive stream is + * closed when the returned iterator is exhausted, when [[Closeable.close]] is called on it, and + * (defensively) on task completion. + */ + def readEntries[T]( + path: Path, + conf: Configuration)( + parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = { + val tar = openTarStream(conf, path) + var closed = false + + def cleanup(): Unit = { + if (!closed) { + closed = true + try tar.close() catch { case NonFatal(_) => } + } + } + + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup())) + + new Iterator[T] with Closeable { + private var currentIter: Iterator[T] = Iterator.empty + private var done = false + + // Move to the next entry whose iterator has elements (releasing each exhausted entry's + // reader and skipping any unread bytes), or mark the stream done once entries run out. + // Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in + // `next` is essential for parsers like `UnivocityParser` that reuse a single mutable row and + // look ahead on `hasNext`: probing the current entry right after returning a row would + // overwrite that row's contents before the caller has copied it. + private def advance(): Unit = { + while (!done && !currentIter.hasNext) { + currentIter match { + case c: Closeable => try c.close() catch { case NonFatal(_) => } + case _ => + } + var entry = tar.getNextEntry + while (entry != null && shouldSkipEntry(entry)) entry = tar.getNextEntry + if (entry == null) { + done = true + cleanup() + } else { + currentIter = parseEntry(entry.getName, new EntryInputStream(tar)) + } + } + } + + // Open the first entry eagerly so construction reflects the archive's first entry. + advance() + + override def hasNext: Boolean = { + advance() + !done && currentIter.hasNext + } + + override def next(): T = { + if (!hasNext) throw new NoSuchElementException + currentIter.next() + } + + override def close(): Unit = { + done = true + currentIter = Iterator.empty + cleanup() + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 596edc8beaa34..6255df5d3ae35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -71,8 +72,24 @@ abstract class CSVDataSource extends Serializable { parsedOptions.singleVariantColumn match { case Some(columnName) => Some(StructType(Array(StructField(columnName, VariantType)))) case None => - if (inputPaths.nonEmpty) { - Some(infer(sparkSession, inputPaths, parsedOptions)) + // Tar archives are inferred by streaming their entries (never unpacked to disk); any + // non-archive files are inferred normally and the two schemas are merged. + val (archives, nonArchives) = + if (sparkSession.sessionState.conf.getConf(SQLConf.ARCHIVE_FORMAT_ENABLED)) { + inputPaths.partition(f => ArchiveReader.isArchivePath(f.getPath)) + } else { + (Seq.empty[FileStatus], inputPaths) + } + if (archives.nonEmpty) { + val archiveSchema = inferArchives(sparkSession, archives, parsedOptions) + if (nonArchives.nonEmpty) { + Some(mergeSchemas( + infer(sparkSession, nonArchives, parsedOptions), archiveSchema, parsedOptions)) + } else { + Some(archiveSchema) + } + } else if (nonArchives.nonEmpty) { + Some(infer(sparkSession, nonArchives, parsedOptions)) } else { None } @@ -83,6 +100,74 @@ abstract class CSVDataSource extends Serializable { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: CSVOptions): StructType + + /** + * Infers a CSV schema from tar archives (`.tar`/`.tar.gz`/`.tgz`) by streaming their entries -- + * the archive is never unpacked to disk. Each entry is tokenized like a standalone CSV file (its + * header row dropped when `header` is set), and all entries' rows feed a single + * [[CSVInferSchema]] pass keyed on the first entry's header, so the result matches reading the + * entries as separate files. Mirrors [[MultiLineCSVDataSource.infer]] but iterates tar entries. + */ + private def inferArchives( + sparkSession: SparkSession, + archives: Seq[FileStatus], + parsedOptions: CSVOptions): StructType = { + val archiveRdd = createArchiveBaseRdd(sparkSession, archives, parsedOptions) + def tokens(dropHeader: Boolean): RDD[Array[String]] = archiveRdd.flatMap { stream => + val path = new Path(stream.getPath()) + ArchiveReader.readEntries(path, stream.getConfiguration) { (_, in) => + UnivocityParser.tokenizeStream( + in, dropHeader, new CsvParser(parsedOptions.asParserSettings), parsedOptions.charset) + } + } + tokens(dropHeader = false).take(1).headOption match { + case Some(firstRow) => + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) + val sampled = CSVUtils.sample(tokens(dropHeader = parsedOptions.headerFlag), parsedOptions) + SQLExecution.withSQLConfPropagated(sparkSession) { + new CSVInferSchema(parsedOptions).infer(sampled, header) + } + case None => + StructType(Nil) + } + } + + /** One `PortableDataStream` per archive file, like `MultiLineCSVDataSource.createBaseRdd`. */ + private def createArchiveBaseRdd( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + options: CSVOptions): RDD[PortableDataStream] = { + val paths = inputPaths.map(_.getPath) + val name = paths.mkString(",") + val job = Job.getInstance( + sparkSession.sessionState.newHadoopConfWithOptions(options.parameters)) + FileInputFormat.setInputPaths(job, paths: _*) + val rdd = new BinaryFileRDD( + sparkSession.sparkContext, + classOf[StreamInputFormat], + classOf[String], + classOf[PortableDataStream], + job.getConfiguration, + sparkSession.sparkContext.defaultMinPartitions) + rdd.setName(s"CSVArchiveFile: $name").values + } + + /** + * Positionally merges two already-inferred CSV schemas (the non-archive vs archive results), + * widening per-column types via [[CSVInferSchema.mergeRowTypes]] and keeping the longer header -- + * the same first-header, positional model used for a multi-file CSV read. + */ + private def mergeSchemas( + left: StructType, + right: StructType, + parsedOptions: CSVOptions): StructType = { + val inferSchema = new CSVInferSchema(parsedOptions) + val mergedTypes = + inferSchema.mergeRowTypes(left.map(_.dataType).toArray, right.map(_.dataType).toArray) + val header = (if (left.length >= right.length) left else right).map(_.name).toArray + StructType(inferSchema.toStructFields(mergedTypes, header)) + } } object CSVDataSource extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 77a0c53ae4699..5c800181b69ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.execution.datasources.csv +import java.io.InputStream + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.util.LineReader import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -28,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -43,6 +48,12 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = { + // A tar archive is decompressed/unpacked as a sequential stream, so it must be read as a + // single split rather than carved into byte ranges. + if (getSqlConf(sparkSession).getConf(SQLConf.ARCHIVE_FORMAT_ENABLED) && + ArchiveReader.isArchivePath(path)) { + return false + } val parsedOptions = getCsvOptions(sparkSession, options) CSVDataSource(parsedOptions).isSplitable && super.isSplitable(sparkSession, options, path) } @@ -99,6 +110,7 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister { SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) val parsedOptions = getCsvOptions(sparkSession, options) val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled(requiredSchema) + val archiveReadEnabled = getSqlConf(sparkSession).getConf(SQLConf.ARCHIVE_FORMAT_ENABLED) // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) @@ -119,24 +131,45 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister { dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val actualRequiredSchema = StructType( requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) - val parser = new UnivocityParser( - actualDataSchema, - actualRequiredSchema, - parsedOptions, - actualFilters) // Use column pruning when specified by Catalyst, except when one or more columns have // existence default value(s), since in that case we instruct the CSV parser to disable column // pruning and instead read each entire row in order to correctly assign the default value(s). val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema - val isStartOfFile = file.start == 0 - val headerChecker = new CSVHeaderChecker( - schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) - CSVDataSource(parsedOptions).readFile( - conf, - file, - parser, - headerChecker, - requiredSchema) + + def newParser(): UnivocityParser = + new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters) + + if (archiveReadEnabled && ArchiveReader.isArchivePath(file.toPath)) { + // The whole archive is a single split (see `isSplitable`). Stream each entry through the + // CSV parser without materializing it to disk, building a fresh parser/header checker per + // entry. Each entry is treated as the start of its own file, so its header (if any) is + // handled exactly as it would be for a standalone CSV file. + ArchiveReader.readEntries(file.toPath, conf) { (entryName, in) => + val parser = newParser() + val headerChecker = new CSVHeaderChecker( + schema, + parsedOptions, + source = s"CSV archive entry: ${file.urlEncodedPath}!/$entryName", + isStartOfFile = true) + if (parsedOptions.multiLine) { + UnivocityParser.parseStream(in, parser, headerChecker, requiredSchema) + } else { + UnivocityParser.parseIterator( + archiveLines(in, parsedOptions), parser, headerChecker, requiredSchema) + } + } + } else { + val parser = newParser() + val isStartOfFile = file.start == 0 + val headerChecker = new CSVHeaderChecker( + schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) + CSVDataSource(parsedOptions).readFile( + conf, + file, + parser, + headerChecker, + requiredSchema) + } } } @@ -176,4 +209,39 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister { conf.sessionLocalTimeZone, conf.columnNameOfCorruptRecord) } + + /** + * Splits a single archive entry's bytes into CSV lines, mirroring the line semantics of + * `HadoopFileLinesReader` used by [[TextInputCSVDataSource.readFile]]: lines are split with + * Hadoop's `LineReader` (honoring a custom line separator), decoded with the configured charset, + * and the separator is re-appended so `UnivocityParser` does not raise EOF on the final line. + */ + private def archiveLines(in: InputStream, options: CSVOptions): Iterator[String] = { + val lineReader = options.lineSeparatorInRead match { + case Some(sep) => new LineReader(in, sep) + case _ => new LineReader(in) + } + val newline = options.lineSeparatorInRead.getOrElse( + Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte)) + new Iterator[String] { + private val text = new Text() + private var finished = false + private var hasValue = false + + override def hasNext: Boolean = { + if (!finished && !hasValue) { + finished = lineReader.readLine(text) == 0 + hasValue = !finished + } + !finished + } + + override def next(): String = { + if (!hasNext) throw new NoSuchElementException + hasValue = false + text.append(newline, 0, newline.length) + new String(text.getBytes, 0, text.getLength, options.charset) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala new file mode 100644 index 0000000000000..a6671535a1fe0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuite.scala @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{File, FileOutputStream, OutputStream} +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.util.zip.GZIPOutputStream + +import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.util.Utils + +/** + * End-to-end reads of `.tar` / `.tar.gz` / `.tgz` archives of CSV files through the streaming + * [[ArchiveReader]] path wired into `CSVFileFormat`. Entries are streamed (never unpacked to + * disk), and the central contract verified throughout is parity with reading the same files from + * a directory. + */ +class ArchiveReadSuite extends QueryTest with SharedSparkSession { + + override def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.ARCHIVE_FORMAT_ENABLED.key, "true") + + import testImplicits._ + + // ----- helpers ------------------------------------------------------------ + + /** + * Provide an archive-extensioned file path inside a fresh temp dir. The extension is + * load-bearing: [[ArchiveReader.isArchivePath]] dispatches solely on the suffix. + */ + private def withArchiveFile(extension: String = "tar")(f: File => Unit): Unit = { + val dir = Utils.createTempDir(namePrefix = "archive-test") + try f(new File(dir, s"archive.$extension")) finally Utils.deleteRecursively(dir) + } + + /** Backwards-compatible shorthand for the plain-`.tar` case. */ + private def withTarFile(f: File => Unit): Unit = withArchiveFile("tar")(f) + + private def writeTar(dest: File, entries: Seq[(String, Array[Byte])]): Unit = + writeTarTo(new FileOutputStream(dest), entries) + + private def writeTarGz(dest: File, entries: Seq[(String, Array[Byte])]): Unit = + writeTarTo(new GZIPOutputStream(new FileOutputStream(dest)), entries) + + private def writeTarTo(rawOut: OutputStream, entries: Seq[(String, Array[Byte])]): Unit = { + val out = new TarArchiveOutputStream(rawOut) + try { + entries.foreach { case (name, bytes) => + val e = new TarArchiveEntry(name) + e.setSize(bytes.length.toLong) + out.putArchiveEntry(e) + out.write(bytes) + out.closeArchiveEntry() + } + out.finish() + } finally out.close() + } + + private def bytes(s: String): Array[Byte] = s.getBytes(StandardCharsets.UTF_8) + + /** + * Write `df` as a single CSV file to a temp dir, then return the bytes of the single produced + * data file. Used to build "real" CSV payloads for tarring. + */ + private def encodeAsSingleCsv( + df: DataFrame, + writerOpts: Map[String, String] = Map.empty): Array[Byte] = { + val d = Utils.createTempDir(namePrefix = "archive-test-encode") + try { + var writer = df.coalesce(1).write.format("csv").mode("overwrite") + writerOpts.foreach { case (k, v) => writer = writer.option(k, v) } + writer.save(d.getCanonicalPath) + val parts = d.listFiles().filter { f => + f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") && + !f.getName.endsWith(".crc") + } + assert(parts.length == 1, + s"expected exactly one data file, got: ${parts.map(_.getName).toList}") + Files.readAllBytes(parts.head.toPath) + } finally Utils.deleteRecursively(d) + } + + private val csvWriteOpts = Map("header" -> "true") + + /** Reads `entries` from a tar and asserts the rows match reading them from a directory. */ + private def assertTarMatchesDir( + entries: Seq[(String, Array[Byte])], + readerOpts: Map[String, String], + schema: String): Unit = { + withTarFile { tarPath => + writeTar(tarPath, entries) + val readTar = readerOpts.foldLeft(spark.read) { case (r, (k, v)) => r.option(k, v) } + .schema(schema).csv(tarPath.getCanonicalPath) + withTempDir { dir => + entries.foreach { case (name, b) => Files.write(new File(dir, name).toPath, b) } + val readDir = readerOpts.foldLeft(spark.read) { case (r, (k, v)) => r.option(k, v) } + .schema(schema).csv(dir.getCanonicalPath) + checkAnswer(readTar, readDir.collect().toSeq) + } + } + } + + // ----- round-trips -------------------------------------------------------- + + test("CSV: read tar of multiple CSV entries with header") { + withTarFile { tarPath => + val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name") + val df2 = Seq((3, "Carol")).toDF("id", "name") + val df3 = Seq((4, "Dan"), (5, "Eve")).toDF("id", "name") + + val parts = Seq(df1, df2, df3).map(g => encodeAsSingleCsv(g, csvWriteOpts)) + writeTar(tarPath, parts.zipWithIndex.map { case (b, i) => (s"part-$i.csv", b) }) + + val read = spark.read.option("header", "true") + .schema("id INT, name STRING").csv(tarPath.getCanonicalPath) + checkAnswer(read, df1.union(df2).union(df3)) + } + } + + test("CSV: read gzipped tar (.tar.gz): Hadoop codec auto-decompresses") { + withArchiveFile("tar.gz") { tarPath => + val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name") + val df2 = Seq((3, "Carol")).toDF("id", "name") + val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts)) + writeTarGz(tarPath, parts.zipWithIndex.map { case (b, i) => (s"part-$i.csv", b) }) + + val read = spark.read.option("header", "true") + .schema("id INT, name STRING").csv(tarPath.getCanonicalPath) + checkAnswer(read, df1.union(df2)) + } + } + + test("CSV: read gzipped tar (.tgz): explicit gunzip path") { + withArchiveFile("tgz") { tarPath => + val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name") + val df2 = Seq((3, "Carol")).toDF("id", "name") + val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts)) + writeTarGz(tarPath, parts.zipWithIndex.map { case (b, i) => (s"part-$i.csv", b) }) + + val read = spark.read.option("header", "true") + .schema("id INT, name STRING").csv(tarPath.getCanonicalPath) + checkAnswer(read, df1.union(df2)) + } + } + + // ----- parity with directory reads ---------------------------------------- + + test("CSV: tar entries with mismatched headers behave like standalone files") { + assertTarMatchesDir( + Seq( + "a.csv" -> bytes("id,name\n1,Alice\n2,Bob\n"), + "b.csv" -> bytes("id,nickname\n3,Carol\n")), + Map("header" -> "true"), + "id INT, name STRING") + } + + test("CSV: headerless tar matches a directory read") { + assertTarMatchesDir( + Seq("a.csv" -> bytes("1,Alice\n2,Bob\n"), "b.csv" -> bytes("3,Carol\n")), + Map.empty, + "id INT, name STRING") + } + + test("CSV: custom delimiter matches a directory read") { + assertTarMatchesDir( + Seq("a.csv" -> bytes("id;name\n1;Alice\n2;Bob\n")), + Map("header" -> "true", "delimiter" -> ";"), + "id INT, name STRING") + } + + test("CSV: multiline quoted fields with embedded newlines match a directory read") { + assertTarMatchesDir( + Seq( + "a.csv" -> bytes("id,note\n1,\"line1\nline2\"\n2,\"plain\"\n"), + "b.csv" -> bytes("id,note\n3,\"a\nb\nc\"\n")), + Map("header" -> "true", "multiLine" -> "true"), + "id INT, note STRING") + } + + test("CSV: column pruning selects a subset of columns") { + withTarFile { tarPath => + val df = Seq((1, "Alice", 10), (2, "Bob", 20)).toDF("id", "name", "age") + writeTar(tarPath, Seq("p.csv" -> encodeAsSingleCsv(df, csvWriteOpts))) + + val read = spark.read.option("header", "true") + .schema("id INT, name STRING, age INT").csv(tarPath.getCanonicalPath) + .select("name") + checkAnswer(read, Seq(Row("Alice"), Row("Bob"))) + } + } + + test("Tar combined with non-archive sibling under a partitioned dir layout") { + withTempDir { rootDir => + val partitionDir = new File(rootDir, "dt=2024-01-01") + assert(partitionDir.mkdirs()) + + val df1 = Seq((1, "in-tar")).toDF("id", "v") + val df2 = Seq((2, "loose")).toDF("id", "v") + + writeTar( + new File(partitionDir, "in-tar.tar"), Seq("p.csv" -> encodeAsSingleCsv(df1, csvWriteOpts))) + Files.write(new File(partitionDir, "loose.csv").toPath, encodeAsSingleCsv(df2, csvWriteOpts)) + + val read = spark.read.option("header", "true") + .schema("id INT, v STRING").csv(rootDir.getCanonicalPath) + val rows = read.select("id", "v", "dt").collect().map(_.toSeq.map(_.toString).toList).toSet + assert(rows == Set( + List("1", "in-tar", "2024-01-01"), + List("2", "loose", "2024-01-01")), + s"unexpected rows: $rows") + } + } + + test("Splittability: a tar always yields a single partition regardless of size") { + withTarFile { tarPath => + val df = (1 to 1000).map(i => (i, s"value-$i")).toDF("id", "v") + val parts = (1 to 4).map(_ => encodeAsSingleCsv(df, csvWriteOpts)) + writeTar(tarPath, parts.zipWithIndex.map { case (b, i) => (s"e-$i.csv", b) }) + + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1024") { + val readDf = spark.read.option("header", "true") + .schema("id INT, v STRING").csv(tarPath.getCanonicalPath) + assert(readDf.rdd.getNumPartitions == 1, + s"tar should be a single partition; got ${readDf.rdd.getNumPartitions}") + assert(readDf.count() == 4L * df.count()) + } + } + } + + // ----- schema inference --------------------------------------------------- + + test("Inference parity: tar infers the same schema as a directory of CSVs") { + val df1 = Seq((1, "a"), (2, "b")).toDF("id", "v") + val df2 = Seq((3, "c")).toDF("id", "v") + val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts)) + + val tarHolder = Utils.createTempDir(namePrefix = "archive-test-tar") + val dirHolder = Utils.createTempDir(namePrefix = "archive-test-dir") + try { + val tarFile = new File(tarHolder, "infer.tar") + writeTar(tarFile, parts.zipWithIndex.map { case (b, i) => (s"e-$i.csv", b) }) + val tarSchema = spark.read.option("header", "true").option("inferSchema", "true") + .csv(tarFile.getCanonicalPath).schema + + parts.zipWithIndex.foreach { case (b, i) => + Files.write(new File(dirHolder, s"e-$i.csv").toPath, b) + } + val dirSchema = spark.read.option("header", "true").option("inferSchema", "true") + .csv(dirHolder.getCanonicalPath).schema + + assert(tarSchema == dirSchema, + s"schema inference parity broken; tar=$tarSchema dir=$dirSchema") + val expected = StructType(Seq( + StructField("id", IntegerType, nullable = true), + StructField("v", StringType, nullable = true))) + assert(tarSchema == expected, s"unexpected inferred schema: $tarSchema") + } finally { + Utils.deleteRecursively(tarHolder) + Utils.deleteRecursively(dirHolder) + } + } + + test("Inference: .tar.gz infers the same schema as the equivalent .tar") { + val df1 = Seq((1, "a"), (2, "b")).toDF("id", "v") + val df2 = Seq((3, "c")).toDF("id", "v") + val parts = Seq(df1, df2).map(g => encodeAsSingleCsv(g, csvWriteOpts)) + val entries = parts.zipWithIndex.map { case (b, i) => (s"e-$i.csv", b) } + + val plainHolder = Utils.createTempDir(namePrefix = "archive-test-tar") + val gzHolder = Utils.createTempDir(namePrefix = "archive-test-targz") + try { + val plainTar = new File(plainHolder, "infer.tar") + writeTar(plainTar, entries) + val gzTar = new File(gzHolder, "infer.tar.gz") + writeTarGz(gzTar, entries) + + val plainSchema = spark.read.option("header", "true").option("inferSchema", "true") + .csv(plainTar.getCanonicalPath).schema + val gzSchema = spark.read.option("header", "true").option("inferSchema", "true") + .csv(gzTar.getCanonicalPath).schema + assert(plainSchema == gzSchema, + s"schema inference parity broken; tar=$plainSchema tar.gz=$gzSchema") + } finally { + Utils.deleteRecursively(plainHolder) + Utils.deleteRecursively(gzHolder) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala new file mode 100644 index 0000000000000..551e98b510abb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{ByteArrayOutputStream, Closeable, File, FileOutputStream, InputStream, OutputStream} +import java.nio.charset.StandardCharsets +import java.util.Properties +import java.util.zip.GZIPOutputStream + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkFunSuite, TaskContext, TaskContextImpl} + +/** + * Unit tests for the streaming [[ArchiveReader]] core: `isArchivePath` dispatch and `readEntries` + * (entry ordering, gzip handling, dir/dotfile skipping, lazy advance, the non-closing entry + * stream, and cleanup). Nothing here touches local disk -- entries are consumed as streams. + */ +class ArchiveReaderSuite extends SparkFunSuite { + + private case class Entry(name: String, data: Array[Byte], isDir: Boolean = false) + + private def writeTar(file: File, entries: Seq[Entry]): Unit = + writeTarTo(new FileOutputStream(file), entries) + + /** Write a gzipped tar, used to verify the `.tar.gz` / `.tgz` archive paths. */ + private def writeTarGz(file: File, entries: Seq[Entry]): Unit = + writeTarTo(new GZIPOutputStream(new FileOutputStream(file)), entries) + + private def writeTarTo(rawOut: OutputStream, entries: Seq[Entry]): Unit = { + val out = new TarArchiveOutputStream(rawOut) + try { + entries.foreach { e => + // TarArchiveEntry treats a trailing slash in the name as a directory marker. + val rawName = if (e.isDir && !e.name.endsWith("/")) e.name + "/" else e.name + val tarEntry = new TarArchiveEntry(rawName) + if (!e.isDir) tarEntry.setSize(e.data.length.toLong) + out.putArchiveEntry(tarEntry) + if (!e.isDir) out.write(e.data) + out.closeArchiveEntry() + } + out.finish() + } finally out.close() + } + + private def textEntry(name: String, body: String): Entry = + Entry(name, body.getBytes(StandardCharsets.UTF_8)) + + private def readAll(in: InputStream): Array[Byte] = { + val out = new ByteArrayOutputStream() + val buf = new Array[Byte](4096) + var n = in.read(buf) + while (n >= 0) { + out.write(buf, 0, n) + n = in.read(buf) + } + out.toByteArray + } + + /** Drains every entry into `(name, decodedText)` pairs through `ArchiveReader.readEntries`. */ + private def collect(file: File): Seq[(String, String)] = + ArchiveReader.readEntries(new Path(file.toURI), new Configuration()) { (name, in) => + Iterator.single((name, new String(readAll(in), StandardCharsets.UTF_8))) + }.toList + + // ----- isArchivePath ------------------------------------------------------ + + test("isArchivePath: positive cases") { + Seq( + "foo.tar", "FOO.TAR", "/a/b/c/x.tar", "weird.TaR", + "foo.tar.gz", "FOO.TAR.GZ", "mixed.Tar.Gz", "/a/b/c/x.tar.gz", + "foo.tgz", "FOO.TGZ", "/a/b/c/x.tgz" + ).foreach { p => + assert(ArchiveReader.isArchivePath(new Path(p)), s"expected archive match for $p") + } + } + + test("isArchivePath: negative cases") { + Seq("foo.csv", "foo.gz", "foo", "dir/", "foo.tarball", "data.zip", + "foo.tar.bz2", "foo.targz").foreach { p => + assert(!ArchiveReader.isArchivePath(new Path(p)), s"expected non-match for $p") + } + } + + // ----- readEntries -------------------------------------------------------- + + test("readEntries: empty tar yields empty iterator") { + withTempDir { dir => + val tar = new File(dir, "empty.tar") + writeTar(tar, Seq.empty) + assert(collect(tar).isEmpty) + } + } + + test("readEntries: single entry exposes its name and bytes") { + withTempDir { dir => + val tar = new File(dir, "single.tar") + writeTar(tar, Seq(textEntry("only.csv", "hello\n"))) + assert(collect(tar) == Seq("only.csv" -> "hello\n")) + } + } + + test("readEntries: multiple entries chained in tar order") { + withTempDir { dir => + val tar = new File(dir, "multi.tar") + writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c"))) + assert(collect(tar) == Seq("a.csv" -> "a", "b.csv" -> "b", "c.csv" -> "c")) + } + } + + test("readEntries: gzipped tar (.tar.gz) via Hadoop codec factory") { + withTempDir { dir => + val tarGz = new File(dir, "data.tar.gz") + writeTarGz(tarGz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + assert(collect(tarGz) == Seq("a.csv" -> "a", "b.csv" -> "b")) + } + } + + test("readEntries: gzipped tar (.tgz) via explicit GZIPInputStream wrap") { + withTempDir { dir => + val tgz = new File(dir, "data.tgz") + writeTarGz(tgz, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + assert(collect(tgz) == Seq("a.csv" -> "a", "b.csv" -> "b")) + } + } + + test("readEntries: directory entries are skipped") { + withTempDir { dir => + val tar = new File(dir, "dirs.tar") + writeTar(tar, Seq( + Entry("subdir", Array.emptyByteArray, isDir = true), + textEntry("subdir/data.csv", "x"))) + assert(collect(tar) == Seq("subdir/data.csv" -> "x")) + } + } + + test("readEntries: dotfile entries (e.g. macOS ._foo) are skipped") { + withTempDir { dir => + val tar = new File(dir, "dots.tar") + writeTar(tar, Seq( + textEntry("._real.csv", "junk"), // macOS AppleDouble sidecar + textEntry(".hidden", "ignored"), // bare dotfile + textEntry("real.csv", "kept"), + textEntry("nested/._sidecar", "junk2"))) // dotfile in a subdir + assert(collect(tar) == Seq("real.csv" -> "kept")) + } + } + + test("readEntries: advances lazily, one entry at a time") { + withTempDir { dir => + val tar = new File(dir, "lazy.tar") + writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c"))) + + val opened = ArrayBuffer[String]() + // parseEntry yields a single element without reading the stream, so each invocation maps to + // exactly one consumed output element -- letting us observe when the next entry is opened. + val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, _) => + opened += name + Iterator.single(name) + } + + // Construction opens only the first entry; later entries open on demand as iteration + // crosses each entry boundary (never all upfront). + assert(opened.toList == List("a.csv")) + assert(it.hasNext) + assert(it.next() == "a.csv") + // Entry 0 is still in flight until its element is consumed, so entry 1 stays unopened. + assert(opened.toList == List("a.csv")) + assert(it.next() == "b.csv") + assert(opened.toList == List("a.csv", "b.csv")) + assert(it.next() == "c.csv") + assert(opened.toList == List("a.csv", "b.csv", "c.csv")) + assert(!it.hasNext) + assert(opened.size == 3) + } + } + + test("readEntries: a parseEntry that closes its stream still advances to the next entry") { + withTempDir { dir => + val tar = new File(dir, "close.tar") + writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + + val seen = ArrayBuffer[String]() + val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, in) => + val body = new String(readAll(in), StandardCharsets.UTF_8) + in.close() // must NOT close the underlying archive + seen += body + Iterator.single(name) + } + assert(it.toList == List("a.csv", "b.csv")) + assert(seen.toList == List("a", "b")) + } + } + + test("readEntries: close() is safe, idempotent, and stops iteration") { + withTempDir { dir => + val tar = new File(dir, "closeable.tar") + writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + + val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, _) => + Iterator.single(name) + } + assert(it.hasNext) + it.asInstanceOf[Closeable].close() + it.asInstanceOf[Closeable].close() // idempotent + assert(!it.hasNext) + } + } + + test("readEntries: TaskContext completion cleans up without error") { + withTempDir { dir => + val tar = new File(dir, "ctx.tar") + writeTar(tar, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + + val ctx = new TaskContextImpl( + stageId = 0, + stageAttemptNumber = 0, + partitionId = 0, + taskAttemptId = 1L, + attemptNumber = 0, + numPartitions = 0, + taskMemoryManager = null, + localProperties = new Properties, + metricsSystem = null) + TaskContext.setTaskContext(ctx) + try { + val it = ArchiveReader.readEntries(new Path(tar.toURI), new Configuration()) { (name, _) => + Iterator.single(name) + } + assert(it.hasNext) + it.next() // open the archive and register the completion listener + // Simulate task completion without exhausting/closing the iterator. + ctx.markTaskCompleted(None) + } finally { + TaskContext.unset() + } + } + } +} From bde07edb1794aa375982760730596278099df695 Mon Sep 17 00:00:00 2001 From: Akshat Shenoi Date: Fri, 29 May 2026 23:26:00 +0000 Subject: [PATCH 2/2] [SPARK-57135][SQL] Move archive CSV read into CSVDataSource.readFile Address review feedback: move the per-entry tar-archive streaming/parsing from CSVFileFormat.buildReader into the CSVDataSource.readFile overrides via a shared readArchive helper (archiveLines moves to TextInputCSVDataSource with brief @param/@return docs), and update CSVPartitionReaderFactory to the new readFile signature (archiveReadEnabled = false; the V2 reader does not read archives). --- .../datasources/csv/CSVDataSource.scala | 120 ++++++++++++++++-- .../datasources/csv/CSVFileFormat.scala | 76 +---------- .../v2/csv/CSVPartitionReaderFactory.scala | 21 ++- 3 files changed, 125 insertions(+), 92 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 6255df5d3ae35..c0640cbc20711 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.io.{FileNotFoundException, IOException} +import java.io.{FileNotFoundException, InputStream, IOException} import java.nio.charset.{Charset, StandardCharsets} import scala.util.control.NonFatal @@ -25,8 +25,10 @@ import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.util.LineReader import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} @@ -53,14 +55,18 @@ abstract class CSVDataSource extends Serializable { def isSplitable: Boolean /** - * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. When `archiveReadEnabled` is set + * and `file` is a tar archive, its entries are streamed through the parser (see [[readArchive]]); + * otherwise the file is parsed directly. `getHeaderChecker` / `getParser` build a fresh header + * checker / parser -- per entry for archives, once for a regular file. */ def readFile( conf: Configuration, file: PartitionedFile, - parser: UnivocityParser, - headerChecker: CSVHeaderChecker, - requiredSchema: StructType): Iterator[InternalRow] + getParser: () => UnivocityParser, + getHeaderChecker: (Boolean, String) => CSVHeaderChecker, + requiredSchema: StructType, + archiveReadEnabled: Boolean): Iterator[InternalRow] /** * Infers the schema from `inputPaths` files. @@ -101,6 +107,37 @@ abstract class CSVDataSource extends Serializable { inputPaths: Seq[FileStatus], parsedOptions: CSVOptions): StructType + /** + * Streams a tar archive (`.tar`/`.tar.gz`/`.tgz`) entry by entry through the CSV parser without + * unpacking it to disk. The whole archive is a single split (see `CSVFileFormat.isSplitable`); a + * fresh header checker and parser are built per entry so each entry is parsed exactly like a + * standalone CSV file -- its header, if any, validated and dropped independently. The + * mode-specific `parseEntry` turns one entry into rows via `parseStream` / `parseIterator`. + * + * @param conf Hadoop configuration used to open and (when needed) decompress the archive. + * @param file the archive file; its bytes are never materialized to disk. + * @param getParser builds a fresh [[UnivocityParser]]. + * @param getHeaderChecker builds a fresh [[CSVHeaderChecker]] for `(isStartOfFile, source)`, + * where `source` (`CSV archive entry: !/`) names the + * entry in error messages. + * @param parseEntry parses one entry's `(parser, headerChecker, stream)` into rows. + * @return the concatenated rows of every non-skipped entry, read one entry at a time. + */ + protected def readArchive( + conf: Configuration, + file: PartitionedFile, + getParser: () => UnivocityParser, + getHeaderChecker: (Boolean, String) => CSVHeaderChecker)( + parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) => Iterator[InternalRow]) + : Iterator[InternalRow] = { + ArchiveReader.readEntries(file.toPath, conf) { (entryName, in) => + val headerChecker = + getHeaderChecker(true, s"CSV archive entry: ${file.urlEncodedPath}!/$entryName") + val parser = getParser() + parseEntry(parser, headerChecker, in) + } + } + /** * Infers a CSV schema from tar archives (`.tar`/`.tar.gz`/`.tgz`) by streaming their entries -- * the archive is never unpacked to disk. Each entry is tokenized like a standalone CSV file (its @@ -211,9 +248,20 @@ object TextInputCSVDataSource extends CSVDataSource { override def readFile( conf: Configuration, file: PartitionedFile, - parser: UnivocityParser, - headerChecker: CSVHeaderChecker, - requiredSchema: StructType): Iterator[InternalRow] = { + getParser: () => UnivocityParser, + getHeaderChecker: (Boolean, String) => CSVHeaderChecker, + requiredSchema: StructType, + archiveReadEnabled: Boolean): Iterator[InternalRow] = { + if (archiveReadEnabled && ArchiveReader.isArchivePath(file.toPath)) { + // Stream each tar entry through the line-based parser, treating the entry exactly like a + // standalone CSV file (a fresh parser/header checker is built per entry). + return readArchive(conf, file, getParser, getHeaderChecker) { (parser, headerChecker, in) => + UnivocityParser.parseIterator( + archiveLines(in, parser.options), parser, headerChecker, requiredSchema) + } + } + val parser = getParser() + val headerChecker = getHeaderChecker(file.start == 0, s"CSV file: ${file.urlEncodedPath}") val lines = { val linesReader = Utils.createResourceUninterruptiblyIfInTaskThread( new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf) @@ -229,6 +277,46 @@ object TextInputCSVDataSource extends CSVDataSource { UnivocityParser.parseIterator(lines, parser, headerChecker, requiredSchema) } + /** + * Splits a single archive entry's bytes into CSV lines, mirroring the line semantics of + * `HadoopFileLinesReader` used by the non-archive [[readFile]] path above: lines are split with + * Hadoop's `LineReader` (honoring a custom line separator), decoded with the configured charset, + * and the separator is re-appended so `UnivocityParser` does not raise EOF on the final line. + * + * @param in bytes of one already-decompressed archive entry; not closed here (the archive owns + * the underlying stream). + * @param options CSV options supplying the read line separator and charset. + * @return an iterator over the entry's lines, each terminated with the line separator. + */ + private def archiveLines(in: InputStream, options: CSVOptions): Iterator[String] = { + val lineReader = options.lineSeparatorInRead match { + case Some(sep) => new LineReader(in, sep) + case _ => new LineReader(in) + } + val newline = options.lineSeparatorInRead.getOrElse( + Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte)) + new Iterator[String] { + private val text = new Text() + private var finished = false + private var hasValue = false + + override def hasNext: Boolean = { + if (!finished && !hasValue) { + finished = lineReader.readLine(text) == 0 + hasValue = !finished + } + !finished + } + + override def next(): String = { + if (!hasNext) throw new NoSuchElementException + hasValue = false + text.append(newline, 0, newline.length) + new String(text.getBytes, 0, text.getLength, options.charset) + } + } + } + override def infer( sparkSession: SparkSession, inputPaths: Seq[FileStatus], @@ -300,9 +388,19 @@ object MultiLineCSVDataSource extends CSVDataSource with Logging { override def readFile( conf: Configuration, file: PartitionedFile, - parser: UnivocityParser, - headerChecker: CSVHeaderChecker, - requiredSchema: StructType): Iterator[InternalRow] = { + getParser: () => UnivocityParser, + getHeaderChecker: (Boolean, String) => CSVHeaderChecker, + requiredSchema: StructType, + archiveReadEnabled: Boolean): Iterator[InternalRow] = { + if (archiveReadEnabled && ArchiveReader.isArchivePath(file.toPath)) { + // Stream each tar entry whole through the multi-line parser (a fresh parser/header checker + // is built per entry). + return readArchive(conf, file, getParser, getHeaderChecker) { (parser, headerChecker, in) => + UnivocityParser.parseStream(in, parser, headerChecker, requiredSchema) + } + } + val parser = getParser() + val headerChecker = getHeaderChecker(file.start == 0, s"CSV file: ${file.urlEncodedPath}") headerChecker.setHeaderForSingleVariantColumn = CSVDataSource.setHeaderForSingleVariantColumn(conf, file, parser) UnivocityParser.parseStream( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 5c800181b69ec..b7e04c3d9af03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.execution.datasources.csv -import java.io.InputStream - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.util.LineReader import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -138,38 +134,13 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister { def newParser(): UnivocityParser = new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters) + def getHeaderChecker(isStartOfFile: Boolean, source: String): CSVHeaderChecker = + new CSVHeaderChecker(schema, parsedOptions, source, isStartOfFile) - if (archiveReadEnabled && ArchiveReader.isArchivePath(file.toPath)) { - // The whole archive is a single split (see `isSplitable`). Stream each entry through the - // CSV parser without materializing it to disk, building a fresh parser/header checker per - // entry. Each entry is treated as the start of its own file, so its header (if any) is - // handled exactly as it would be for a standalone CSV file. - ArchiveReader.readEntries(file.toPath, conf) { (entryName, in) => - val parser = newParser() - val headerChecker = new CSVHeaderChecker( - schema, - parsedOptions, - source = s"CSV archive entry: ${file.urlEncodedPath}!/$entryName", - isStartOfFile = true) - if (parsedOptions.multiLine) { - UnivocityParser.parseStream(in, parser, headerChecker, requiredSchema) - } else { - UnivocityParser.parseIterator( - archiveLines(in, parsedOptions), parser, headerChecker, requiredSchema) - } - } - } else { - val parser = newParser() - val isStartOfFile = file.start == 0 - val headerChecker = new CSVHeaderChecker( - schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) - CSVDataSource(parsedOptions).readFile( - conf, - file, - parser, - headerChecker, - requiredSchema) - } + // CSVDataSource.readFile parses the file directly, or streams it entry by entry when archive + // reads are enabled and `file` is a tar archive (a single split, see `isSplitable`). + CSVDataSource(parsedOptions).readFile( + conf, file, () => newParser(), getHeaderChecker, requiredSchema, archiveReadEnabled) } } @@ -209,39 +180,4 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister { conf.sessionLocalTimeZone, conf.columnNameOfCorruptRecord) } - - /** - * Splits a single archive entry's bytes into CSV lines, mirroring the line semantics of - * `HadoopFileLinesReader` used by [[TextInputCSVDataSource.readFile]]: lines are split with - * Hadoop's `LineReader` (honoring a custom line separator), decoded with the configured charset, - * and the separator is re-appended so `UnivocityParser` does not raise EOF on the final line. - */ - private def archiveLines(in: InputStream, options: CSVOptions): Iterator[String] = { - val lineReader = options.lineSeparatorInRead match { - case Some(sep) => new LineReader(in, sep) - case _ => new LineReader(in) - } - val newline = options.lineSeparatorInRead.getOrElse( - Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte)) - new Iterator[String] { - private val text = new Text() - private var finished = false - private var hasValue = false - - override def hasNext: Boolean = { - if (!finished && !hasValue) { - finished = lineReader.readLine(text) == 0 - hasValue = !finished - } - !finished - } - - override def next(): String = { - if (!hasNext) throw new NoSuchElementException - hasValue = false - text.append(newline, 0, newline.length) - new String(text.getBytes, 0, text.getLength, options.charset) - } - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index 65eff0647ee2b..f6bfc0eb1ed74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -53,25 +53,24 @@ case class CSVPartitionReaderFactory( dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) val actualReadDataSchema = StructType( readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) - val parser = new UnivocityParser( - actualDataSchema, - actualReadDataSchema, - options, - filters) val schema = if (options.isColumnPruningEnabled(readDataSchema)) { actualReadDataSchema } else { actualDataSchema } - val isStartOfFile = file.start == 0 - val headerChecker = new CSVHeaderChecker( - schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) + // The V2 CSV reader does not read tar archives, so a single parser and header checker are + // built per file. + val getParser: () => UnivocityParser = () => + new UnivocityParser(actualDataSchema, actualReadDataSchema, options, filters) + val getHeaderChecker: (Boolean, String) => CSVHeaderChecker = (isStartOfFile, source) => + new CSVHeaderChecker(schema, options, source, isStartOfFile) val iter = CSVDataSource(options).readFile( conf, file, - parser, - headerChecker, - readDataSchema) + getParser, + getHeaderChecker, + readDataSchema, + archiveReadEnabled = false) val fileReader = new PartitionReaderFromIterator[InternalRow](iter) new PartitionReaderWithPartitionValues(fileReader, readDataSchema, partitionSchema, file.partitionValues)