Skip to content

Commit 3fcc6c8

Browse files
committed
PARQUET-3479: Add configuration to disable early dictionary compression check
1 parent 28593d5 commit 3fcc6c8

7 files changed

Lines changed: 318 additions & 11 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class ParquetProperties {
6767
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
6868
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;
6969

70+
public static final long DEFAULT_DICTIONARY_CHECK_AFTER_BYTES = 0;
7071
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
7172

7273
/**
@@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) {
131132
private final int rowGroupRowCountLimit;
132133
private final int pageRowCountLimit;
133134
private final boolean pageWriteChecksumEnabled;
135+
private final long dictionaryCheckAfterBytes;
134136
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
135137
private final Map<String, String> extraMetaData;
136138
private final ColumnProperty<Boolean> statistics;
@@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) {
163165
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
164166
this.pageRowCountLimit = builder.pageRowCountLimit;
165167
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
168+
this.dictionaryCheckAfterBytes = builder.dictionaryCheckAfterBytes;
166169
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
167170
this.extraMetaData = builder.extraMetaData;
168171
this.statistics = builder.statistics.build();
@@ -322,6 +325,17 @@ public boolean getPageWriteChecksumEnabled() {
322325
return pageWriteChecksumEnabled;
323326
}
324327

328+
/**
329+
* Returns the byte threshold after which the dictionary compression check is performed.
330+
* A value of 0 means check on the first page (backward compatible default). Higher values
331+
* delay the check until that many raw bytes have been accumulated across pages.
332+
*
333+
* @return the byte threshold for the dictionary compression check
334+
*/
335+
public long getDictionaryCheckAfterBytes() {
336+
return dictionaryCheckAfterBytes;
337+
}
338+
325339
public OptionalLong getBloomFilterNDV(ColumnDescriptor column) {
326340
Long ndv = bloomFilterNDVs.getValue(column);
327341
return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv);
@@ -415,6 +429,7 @@ public static class Builder {
415429
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
416430
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
417431
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
432+
private long dictionaryCheckAfterBytes = DEFAULT_DICTIONARY_CHECK_AFTER_BYTES;
418433
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
419434
private Map<String, String> extraMetaData = new HashMap<>();
420435
private final ColumnProperty.Builder<Boolean> statistics;
@@ -450,6 +465,7 @@ private Builder(ParquetProperties toCopy) {
450465
this.allocator = toCopy.allocator;
451466
this.pageRowCountLimit = toCopy.pageRowCountLimit;
452467
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
468+
this.dictionaryCheckAfterBytes = toCopy.dictionaryCheckAfterBytes;
453469
this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
454470
this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
455471
this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled);
@@ -709,6 +725,19 @@ public Builder withPageWriteChecksumEnabled(boolean val) {
709725
return this;
710726
}
711727

728+
/**
729+
* Set the raw data byte threshold after which the dictionary compression check is performed.
730+
* A value of 0 means check on the first page (backward compatible default). Higher values
731+
* delay the check until that many raw bytes have been accumulated across pages.
732+
*
733+
* @param val byte threshold (default: 0)
734+
* @return this builder for method chaining
735+
*/
736+
public Builder withDictionaryCheckAfterBytes(long val) {
737+
this.dictionaryCheckAfterBytes = val;
738+
return this;
739+
}
740+
712741
public Builder withExtraMetaData(Map<String, String> extraMetaData) {
713742
this.extraMetaData = extraMetaData;
714743
return this;

parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack(
111111
ValuesWriter writerToFallBackTo) {
112112
if (parquetProperties.isDictionaryEnabled(path)) {
113113
return FallbackValuesWriter.of(
114-
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo);
114+
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
115+
writerToFallBackTo,
116+
parquetProperties.getDictionaryCheckAfterBytes());
115117
} else {
116118
return writerToFallBackTo;
117119
}

parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e
3030

3131
public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
3232
I initialWriter, F fallBackWriter) {
33-
return new FallbackValuesWriter<>(initialWriter, fallBackWriter);
33+
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, 0);
34+
}
35+
36+
public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
37+
I initialWriter, F fallBackWriter, long checkAfterBytes) {
38+
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, checkAfterBytes);
3439
}
3540

3641
/**
@@ -44,6 +49,14 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
4449

4550
private boolean fellBackAlready = false;
4651

52+
private boolean compressionChecked = false;
53+
54+
private final long checkAfterBytes;
55+
56+
/* Accumulates raw bytes across pages (only reset in resetDictionary) so the
57+
* threshold check works even when individual pages are smaller than checkAfterBytes. */
58+
private long cumulativeRawBytes = 0;
59+
4760
/**
4861
* writer currently written to
4962
*/
@@ -57,16 +70,16 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
5770
*/
5871
private long rawDataByteSize = 0;
5972

60-
/**
61-
* indicates if this is the first page being processed
62-
*/
63-
private boolean firstPage = true;
64-
6573
public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
74+
this(initialWriter, fallBackWriter, 0);
75+
}
76+
77+
public FallbackValuesWriter(I initialWriter, F fallBackWriter, long checkAfterBytes) {
6678
super();
6779
this.initialWriter = initialWriter;
6880
this.fallBackWriter = fallBackWriter;
6981
this.currentWriter = initialWriter;
82+
this.checkAfterBytes = checkAfterBytes;
7083
}
7184

7285
@Override
@@ -79,8 +92,8 @@ public long getBufferedSize() {
7992

8093
@Override
8194
public BytesInput getBytes() {
82-
if (!fellBackAlready && firstPage) {
83-
// we use the first page to decide if we're going to use this encoding
95+
if (!fellBackAlready && !compressionChecked && cumulativeRawBytes >= checkAfterBytes) {
96+
compressionChecked = true;
8497
BytesInput bytes = initialWriter.getBytes();
8598
if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
8699
fallBack();
@@ -103,7 +116,6 @@ public Encoding getEncoding() {
103116
@Override
104117
public void reset() {
105118
rawDataByteSize = 0;
106-
firstPage = false;
107119
currentWriter.reset();
108120
}
109121

@@ -131,8 +143,9 @@ public void resetDictionary() {
131143
}
132144
currentWriter = initialWriter;
133145
fellBackAlready = false;
146+
compressionChecked = false;
147+
cumulativeRawBytes = 0;
134148
initialUsedAndHadDictionary = false;
135-
firstPage = true;
136149
}
137150

138151
@Override
@@ -167,6 +180,7 @@ private void fallBack() {
167180
@Override
168181
public void writeByte(int value) {
169182
rawDataByteSize += 1;
183+
cumulativeRawBytes += 1;
170184
currentWriter.writeByte(value);
171185
checkFallback();
172186
}
@@ -175,34 +189,39 @@ public void writeByte(int value) {
175189
public void writeBytes(Binary v) {
176190
// for rawdata, length(4 bytes int) is stored, followed by the binary content itself
177191
rawDataByteSize += v.length() + 4;
192+
cumulativeRawBytes += v.length() + 4;
178193
currentWriter.writeBytes(v);
179194
checkFallback();
180195
}
181196

182197
@Override
183198
public void writeInteger(int v) {
184199
rawDataByteSize += 4;
200+
cumulativeRawBytes += 4;
185201
currentWriter.writeInteger(v);
186202
checkFallback();
187203
}
188204

189205
@Override
190206
public void writeLong(long v) {
191207
rawDataByteSize += 8;
208+
cumulativeRawBytes += 8;
192209
currentWriter.writeLong(v);
193210
checkFallback();
194211
}
195212

196213
@Override
197214
public void writeFloat(float v) {
198215
rawDataByteSize += 4;
216+
cumulativeRawBytes += 4;
199217
currentWriter.writeFloat(v);
200218
checkFallback();
201219
}
202220

203221
@Override
204222
public void writeDouble(double v) {
205223
rawDataByteSize += 8;
224+
cumulativeRawBytes += 8;
206225
currentWriter.writeDouble(v);
207226
checkFallback();
208227
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.column.values.fallback;
20+
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
25+
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
26+
import org.apache.parquet.column.Encoding;
27+
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
28+
import org.apache.parquet.column.values.plain.PlainValuesWriter;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
public class TestFallbackValuesWriter {
34+
35+
private TrackingByteBufferAllocator allocator;
36+
37+
@Before
38+
public void initAllocator() {
39+
allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
40+
}
41+
42+
@After
43+
public void closeAllocator() {
44+
allocator.close();
45+
}
46+
47+
/**
48+
* With threshold=0, the check fires on the first page and falls back for high-cardinality data.
49+
*/
50+
@Test
51+
public void testThresholdZeroFallsBackImmediately() throws Exception {
52+
int dictPageSize = 1024 * 1024;
53+
54+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
55+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
56+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
57+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
58+
FallbackValuesWriter.of(dictWriter, plainWriter, 0);
59+
60+
try {
61+
for (int i = 0; i < 1000; i++) {
62+
writer.writeInteger(i);
63+
}
64+
writer.getBytes();
65+
66+
assertFalse(
67+
"Should fall back to plain encoding with threshold=0 and high cardinality",
68+
writer.getEncoding().usesDictionary());
69+
} finally {
70+
writer.close();
71+
}
72+
}
73+
74+
/**
75+
* With a large threshold, the check never fires and dictionary encoding is preserved.
76+
*/
77+
@Test
78+
public void testLargeThresholdPreservesDictionary() throws Exception {
79+
int dictPageSize = 1024 * 1024;
80+
81+
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
82+
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
83+
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);
84+
FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
85+
FallbackValuesWriter.of(dictWriter, plainWriter, Long.MAX_VALUE);
86+
87+
try {
88+
for (int i = 0; i < 1000; i++) {
89+
writer.writeInteger(i);
90+
}
91+
writer.getBytes();
92+
93+
assertTrue(
94+
"Dictionary encoding should be preserved with large threshold",
95+
writer.getEncoding().usesDictionary());
96+
} finally {
97+
writer.close();
98+
}
99+
}
100+
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public static enum JobSummaryLevel {
161161
public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit";
162162
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
163163
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
164+
public static final String DICTIONARY_CHECK_AFTER_BYTES = "parquet.dictionary.check.after.raw.bytes";
164165
public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled";
165166
public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled";
166167

@@ -412,6 +413,14 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
412413
return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
413414
}
414415

416+
public static void setDictionaryCheckAfterBytes(Configuration conf, long val) {
417+
conf.setLong(DICTIONARY_CHECK_AFTER_BYTES, val);
418+
}
419+
420+
public static long getDictionaryCheckAfterBytes(Configuration conf) {
421+
return conf.getLong(DICTIONARY_CHECK_AFTER_BYTES, ParquetProperties.DEFAULT_DICTIONARY_CHECK_AFTER_BYTES);
422+
}
423+
415424
public static void setStatisticsEnabled(JobContext jobContext, boolean enabled) {
416425
getConfiguration(jobContext).setBoolean(STATISTICS_ENABLED, enabled);
417426
}
@@ -526,6 +535,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
526535
.withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
527536
.withPageRowCountLimit(getPageRowCountLimit(conf))
528537
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
538+
.withDictionaryCheckAfterBytes(getDictionaryCheckAfterBytes(conf))
529539
.withStatisticsEnabled(getStatisticsEnabled(conf));
530540
new ColumnConfigParser()
531541
.withColumnConfig(

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,17 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) {
771771
return self();
772772
}
773773

774+
/**
775+
* Set the raw data byte threshold after which the dictionary compression check is performed.
776+
*
777+
* @param val byte threshold (0 means check on every page)
778+
* @return this builder for method chaining.
779+
*/
780+
public SELF withDictionaryCheckAfterBytes(long val) {
781+
encodingPropsBuilder.withDictionaryCheckAfterBytes(val);
782+
return self();
783+
}
784+
774785
/**
775786
* Set max Bloom filter bytes for related columns.
776787
*

0 commit comments

Comments
 (0)