Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ protected LogicalTypeAnnotation fromString(List<String> params) {
protected LogicalTypeAnnotation fromString(List<String> params) {
return unknownType();
}
},
FILE {
@Override
protected LogicalTypeAnnotation fromString(List<String> params) {
return fileType();
}
};

protected abstract LogicalTypeAnnotation fromString(List<String> params);
Expand Down Expand Up @@ -377,6 +383,10 @@ public static UnknownLogicalTypeAnnotation unknownType() {
return UnknownLogicalTypeAnnotation.INSTANCE;
}

public static FileLogicalTypeAnnotation fileType() {
return FileLogicalTypeAnnotation.INSTANCE;
}

public static class StringLogicalTypeAnnotation extends LogicalTypeAnnotation {
private static final StringLogicalTypeAnnotation INSTANCE = new StringLogicalTypeAnnotation();

Expand Down Expand Up @@ -1221,6 +1231,55 @@ public boolean equals(Object obj) {
}
}

/**
* File logical type annotation. Annotates a group (struct) that represents a reference to
* an external file. The group must contain the following fields by name:
* <ul>
* <li>{@code path} (required): STRING - the path/URI of the file</li>
* <li>{@code size} (optional): INT64 - size of the file content in bytes</li>
* <li>{@code offset} (optional): INT64 - byte offset within the file; if present, size must be present</li>
* <li>{@code etag} (optional): STRING - opaque identifier for the file version</li>
* </ul>
* No optional fields with names other than the above are permitted.
*/
public static class FileLogicalTypeAnnotation extends LogicalTypeAnnotation {
private static final FileLogicalTypeAnnotation INSTANCE = new FileLogicalTypeAnnotation();

/** The only required field name in a FILE-annotated group. */
public static final String PATH_FIELD = "path";

/** Valid optional field names in a FILE-annotated group. */
public static final Set<String> OPTIONAL_FIELD_NAMES =
Set.of("size", "offset", "etag");

private FileLogicalTypeAnnotation() {}

@Override
public OriginalType toOriginalType() {
return null;
}

@Override
public <T> Optional<T> accept(LogicalTypeAnnotationVisitor<T> logicalTypeAnnotationVisitor) {
return logicalTypeAnnotationVisitor.visit(this);
}

@Override
LogicalTypeToken getType() {
return LogicalTypeToken.FILE;
}

@Override
public boolean equals(Object obj) {
return obj instanceof FileLogicalTypeAnnotation;
}

@Override
public int hashCode() {
return getClass().hashCode();
}
}

public static class GeometryLogicalTypeAnnotation extends LogicalTypeAnnotation {
private final String crs;

Expand Down Expand Up @@ -1426,5 +1485,9 @@ default Optional<T> visit(GeographyLogicalTypeAnnotation geographyLogicalType) {
default Optional<T> visit(UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) {
return empty();
}

default Optional<T> visit(FileLogicalTypeAnnotation fileLogicalType) {
return empty();
}
}
}
22 changes: 22 additions & 0 deletions parquet-column/src/main/java/org/apache/parquet/schema/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -821,12 +821,34 @@ public THIS addFields(Type... types) {
@Override
protected GroupType build(String name) {
if (newLogicalTypeSet) {
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.FileLogicalTypeAnnotation) {
validateFileTypeFields(name, fields);
}
return new GroupType(repetition, name, logicalTypeAnnotation, fields, id);
} else {
return new GroupType(repetition, name, getOriginalType(), fields, id);
}
}

private static void validateFileTypeFields(String name, List<Type> fields) {
boolean hasPath = false;
for (Type field : fields) {
String fieldName = field.getName();
if (LogicalTypeAnnotation.FileLogicalTypeAnnotation.PATH_FIELD.equals(fieldName)) {
Preconditions.checkArgument(
field.getRepetition() == Type.Repetition.REQUIRED,
"FILE type field 'path' must be REQUIRED in group '%s'",
name);
hasPath = true;
} else if (!LogicalTypeAnnotation.FileLogicalTypeAnnotation.OPTIONAL_FIELD_NAMES.contains(fieldName)) {
throw new IllegalArgumentException(
"FILE type group '" + name + "' contains unrecognized field '" + fieldName
+ "'. Valid fields are: path, size, offset, etag");
}
}
Preconditions.checkArgument(hasPath, "FILE type group '%s' must contain required field 'path'", name);
}

public MapBuilder<THIS> map(Type.Repetition repetition) {
return new MapBuilder<>(self()).repetition(repetition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,81 @@ public void testVariantLogicalTypeWithShredded() {
assertEquals(specVersion, ((LogicalTypeAnnotation.VariantLogicalTypeAnnotation) annotation).getSpecVersion());
}

@Test
public void testFileLogicalTypePathOnly() {
String name = "file_field";
GroupType file = new GroupType(
REQUIRED,
name,
LogicalTypeAnnotation.fileType(),
Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("path"));

assertEquals(
"required group file_field (FILE) {\n"
+ " required binary path (STRING);\n"
+ "}",
file.toString());

LogicalTypeAnnotation annotation = file.getLogicalTypeAnnotation();
assertEquals(LogicalTypeAnnotation.LogicalTypeToken.FILE, annotation.getType());
assertNull(annotation.toOriginalType());
assertTrue(annotation instanceof LogicalTypeAnnotation.FileLogicalTypeAnnotation);
}

@Test
public void testFileLogicalTypeAllFields() {
String name = "file_field";
GroupType file = Types.requiredGroup()
.as(LogicalTypeAnnotation.fileType())
.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("path")
.optional(INT64).named("size")
.optional(INT64).named("offset")
.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("etag")
.named(name);

LogicalTypeAnnotation annotation = file.getLogicalTypeAnnotation();
assertTrue(annotation instanceof LogicalTypeAnnotation.FileLogicalTypeAnnotation);
assertEquals(4, file.getFieldCount());
assertEquals("path", file.getType("path").getName());
assertEquals("size", file.getType("size").getName());
assertEquals("offset", file.getType("offset").getName());
assertEquals("etag", file.getType("etag").getName());
}

@Test
public void testFileLogicalTypeRequiresPathField() {
assertThrows(
"FILE type group must contain required field 'path'",
IllegalArgumentException.class,
() -> Types.requiredGroup()
.as(LogicalTypeAnnotation.fileType())
.optional(INT64).named("size")
.named("missing_path"));
}

@Test
public void testFileLogicalTypeRejectsUnrecognizedField() {
assertThrows(
"FILE type group must not contain unrecognized field names",
IllegalArgumentException.class,
() -> Types.requiredGroup()
.as(LogicalTypeAnnotation.fileType())
.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("path")
.optional(BINARY).named("unknown_field")
.named("file_with_bad_field"));
}

@Test
public void testFileLogicalTypeRequiresRequiredPathField() {
assertThrows(
"FILE type field 'path' must be REQUIRED",
IllegalArgumentException.class,
() -> Types.requiredGroup()
.as(LogicalTypeAnnotation.fileType())
.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("path")
.named("file_with_optional_path"));
}

/**
* A convenience method to avoid a large number of @Test(expected=...) tests
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ public static LogicalType VARIANT(byte specificationVersion) {
public static final LogicalType BSON = LogicalType.BSON(new BsonType());
public static final LogicalType FLOAT16 = LogicalType.FLOAT16(new Float16Type());
public static final LogicalType UUID = LogicalType.UUID(new UUIDType());
public static final LogicalType FILE = LogicalType.FILE(new FileType());
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.parquet.format.Type;
import org.apache.parquet.format.TypeDefinedOrder;
import org.apache.parquet.format.Uncompressed;
import org.apache.parquet.format.FileType;
import org.apache.parquet.format.VariantType;
import org.apache.parquet.format.XxHash;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -576,6 +577,11 @@ public Optional<LogicalType> visit(LogicalTypeAnnotation.GeographyLogicalTypeAnn
geographyType.setAlgorithm(fromParquetEdgeInterpolationAlgorithm(geographyLogicalType.getAlgorithm()));
return of(LogicalType.GEOGRAPHY(geographyType));
}

@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.FileLogicalTypeAnnotation fileLogicalType) {
return of(LogicalTypes.FILE);
}
}

private void addRowGroup(
Expand Down Expand Up @@ -1362,6 +1368,8 @@ LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) {
case VARIANT:
VariantType variant = type.getVARIANT();
return LogicalTypeAnnotation.variantType(variant.getSpecification_version());
case FILE:
return LogicalTypeAnnotation.fileType();
default:
throw new RuntimeException("Unknown logical type " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1977,4 +1977,52 @@ public void testEdgeInterpolationAlgorithmConversion() {
assertNull(ParquetMetadataConverter.fromParquetEdgeInterpolationAlgorithm(null));
assertNull(ParquetMetadataConverter.toParquetEdgeInterpolationAlgorithm(null));
}

@Test
public void testFileLogicalType() {
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

MessageType expected = Types.buildMessage()
.requiredGroup()
.as(LogicalTypeAnnotation.fileType())
.required(PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.named("path")
.optional(PrimitiveTypeName.INT64)
.named("size")
.optional(PrimitiveTypeName.INT64)
.named("offset")
.optional(PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.named("etag")
.named("f")
.named("example");

List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(expected);
MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema, null);
assertEquals(expected, schema);
LogicalTypeAnnotation logicalType = schema.getType("f").getLogicalTypeAnnotation();
assertTrue(logicalType instanceof LogicalTypeAnnotation.FileLogicalTypeAnnotation);
assertEquals(LogicalTypeAnnotation.fileType(), logicalType);
}

@Test
public void testFileLogicalTypeRoundTripPathOnly() {
ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

MessageType expected = Types.buildMessage()
.requiredGroup()
.as(LogicalTypeAnnotation.fileType())
.required(PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.named("path")
.named("f")
.named("example");

List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(expected);
MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema, null);
assertEquals(expected, schema);
LogicalTypeAnnotation logicalType = schema.getType("f").getLogicalTypeAnnotation();
assertTrue(logicalType instanceof LogicalTypeAnnotation.FileLogicalTypeAnnotation);
}
}
Loading