Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -107,6 +108,7 @@ public static class WriteBuilder {
private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null;
private MetricsConfig metricsConfig = MetricsConfig.getDefault();
private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;

private WriteBuilder(OutputFile file) {
this.file = file;
Expand Down Expand Up @@ -189,6 +191,15 @@ private CompressionCodecName codec() {
}
}

/*
* Sets the writer version. Default value is PARQUET_1_0 (v1).
*/
@VisibleForTesting
WriteBuilder withWriterVersion(WriterVersion version) {
this.writerVersion = version;
return this;
}

public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be null");
Expand Down Expand Up @@ -222,8 +233,6 @@ public <D> FileAppender<D> build() throws IOException {
}
}

WriterVersion writerVersion = WriterVersion.PARQUET_1_0;

set("parquet.avro.write-old-list-structure", "false");
MessageType type = ParquetSchemaUtil.convert(schema, name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ private <T> Set<T> dict(int id, Comparator<T> comparator) {

for (int i = 0; i <= dict.getMaxId(); i++) {
switch (col.getPrimitiveType().getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY: dictSet.add((T) conversion.apply(dict.decodeToBinary(i)));
break;
Comment on lines +409 to +410
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to write the following but Checkstyle failed on it because of indention. I guess, it is/was a Checkstyle bug. I don't have experience on gradle so I am not sure how to fix this in Iceberg configuration. Probably, we need to upgrade Checkstyle somehow.

Anyway, both the current and this other one is correct but I would prefer the latter.

Suggested change
case FIXED_LEN_BYTE_ARRAY: dictSet.add((T) conversion.apply(dict.decodeToBinary(i)));
break;
case FIXED_LEN_BYTE_ARRAY: // Same as BINARY

case BINARY: dictSet.add((T) conversion.apply(dict.decodeToBinary(i)));
break;
case INT32: dictSet.add((T) conversion.apply(dict.decodeToInt(i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -36,20 +41,28 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.DecimalType;
import org.apache.iceberg.types.Types.DoubleType;
import org.apache.iceberg.types.Types.FloatType;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.StringType;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.avro.AvroSchemaUtil.convert;
import static org.apache.iceberg.expressions.Expressions.and;
Expand All @@ -70,7 +83,10 @@
import static org.apache.iceberg.expressions.Expressions.startsWith;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;

@RunWith(Parameterized.class)
public class TestDictionaryRowGroupFilter {

private static final Types.StructType structFieldType =
Expand All @@ -88,7 +104,8 @@ public class TestDictionaryRowGroupFilter {
optional(10, "not_in_file", FloatType.get()),
optional(11, "all_nans", DoubleType.get()),
optional(12, "some_nans", FloatType.get()),
optional(13, "no_nans", DoubleType.get())
optional(13, "no_nans", DoubleType.get()),
optional(14, "decimal_fixed", DecimalType.of(20, 10)) // >18 precision to enforce FIXED_LEN_BYTE_ARRAY
);

private static final Types.StructType _structFieldType =
Expand All @@ -105,8 +122,8 @@ public class TestDictionaryRowGroupFilter {
optional(8, "_struct_not_null", _structFieldType),
optional(11, "_all_nans", DoubleType.get()),
optional(12, "_some_nans", FloatType.get()),
optional(13, "_no_nans", DoubleType.get())

optional(13, "_no_nans", DoubleType.get()),
optional(14, "_decimal_fixed", DecimalType.of(20, 10)) // >18 precision to enforce FIXED_LEN_BYTE_ARRAY
);

private static final String TOO_LONG_FOR_STATS;
Expand All @@ -121,14 +138,27 @@ public class TestDictionaryRowGroupFilter {

private static final int INT_MIN_VALUE = 30;
private static final int INT_MAX_VALUE = 79;
private static final BigDecimal DECIMAL_MIN_VALUE = new BigDecimal("-1234567890.0987654321");
private static final BigDecimal DECIMAL_STEP = new BigDecimal("1234567890.0987654321").subtract(DECIMAL_MIN_VALUE)
.divide(new BigDecimal(INT_MAX_VALUE - INT_MIN_VALUE), RoundingMode.HALF_UP);

private MessageType parquetSchema = null;
private BlockMetaData rowGroupMetadata = null;
private DictionaryPageReadStore dictionaryStore = null;
private final WriterVersion writerVersion;

@Rule
public TemporaryFolder temp = new TemporaryFolder();

@Parameterized.Parameters
public static List<WriterVersion> writerVersions() {
return Arrays.asList(PARQUET_1_0, PARQUET_2_0);
}

public TestDictionaryRowGroupFilter(WriterVersion writerVersion) {
this.writerVersion = writerVersion;
}

@Before
public void createInputFile() throws IOException {
File parquetFile = temp.newFile();
Expand All @@ -140,6 +170,7 @@ public void createInputFile() throws IOException {
OutputFile outFile = Files.localOutput(parquetFile);
try (FileAppender<Record> appender = Parquet.write(outFile)
.schema(FILE_SCHEMA)
.withWriterVersion(writerVersion)
.build()) {
GenericRecordBuilder builder = new GenericRecordBuilder(convert(FILE_SCHEMA, "table"));
// create 20 copies of each record to ensure dictionary-encoding
Expand All @@ -157,6 +188,9 @@ public void createInputFile() throws IOException {
builder.set("_some_nans", (i % 10 == 0) ? Float.NaN : 2F); // includes some nan values
builder.set("_no_nans", 3D); // optional, but always non-nan

// min=-1234567890.0987654321, max~=1234567890.0987654321 (depending on rounding), num-nulls=0
builder.set("_decimal_fixed", DECIMAL_MIN_VALUE.add(DECIMAL_STEP.multiply(new BigDecimal(i))));

Record structNotNull = new Record(structSchema);
structNotNull.put("_int_field", INT_MIN_VALUE + i);
builder.set("_struct_not_null", structNotNull); // struct with int
Expand Down Expand Up @@ -892,4 +926,32 @@ public void testTypePromotion() {
.shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
Assert.assertTrue("Should succeed with promoted schema", shouldRead);
}

@Test
public void testFixedLenByteArray() {
// This test is to validate the handling of FIXED_LEN_BYTE_ARRAY Parquet type being dictionary encoded.
// (No need to validate all the possible predicates)

Assume.assumeTrue("decimal_fixed is not dictionary encoded in case of writer version " + writerVersion,
getColumnForName(rowGroupMetadata, "_decimal_fixed").getEncodings().contains(Encoding.RLE_DICTIONARY));

boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA,
greaterThanOrEqual("decimal_fixed", BigDecimal.ZERO)).shouldRead(parquetSchema, rowGroupMetadata,
dictionaryStore);
Assert.assertTrue("Should read: Half of the decimal_fixed values are greater than 0", shouldRead);

shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, lessThan("decimal_fixed", DECIMAL_MIN_VALUE))
.shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
Assert.assertFalse("Should not read: No decimal_fixed values less than -1234567890.0987654321", shouldRead);
}

private ColumnChunkMetaData getColumnForName(BlockMetaData rowGroup, String columnName) {
ColumnPath columnPath = ColumnPath.fromDotString(columnName);
for (ColumnChunkMetaData column : rowGroup.getColumns()) {
if (columnPath.equals(column.getPath())) {
return column;
}
}
throw new NoSuchElementException("No column in rowGroup for the name " + columnName);
}
}