Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getVecto
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return new DictionaryBinaryAccessor<>((IntVector) vector, dictionary);
return new DictionaryBinaryAccessor<>(
(IntVector) vector, dictionary, stringFactorySupplier.get());
case FLOAT:
return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
case INT64:
Expand Down Expand Up @@ -452,17 +453,27 @@ private static class DictionaryBinaryAccessor<
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
private final IntVector offsetVector;
private final Dictionary dictionary;
private final StringFactory<Utf8StringT> stringFactory;

DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
DictionaryBinaryAccessor(
IntVector vector, Dictionary dictionary, StringFactory<Utf8StringT> stringFactory) {
super(vector);
this.offsetVector = vector;
this.dictionary = dictionary;
this.stringFactory = stringFactory;
}

@Override
public final byte[] getBinary(int rowId) {
return dictionary.decodeToBinary(offsetVector.get(rowId)).getBytes();
}

@Override
public Utf8StringT getUTF8String(int rowId) {
return null == stringFactory
? super.getUTF8String(rowId)
: stringFactory.ofRow(offsetVector, dictionary, rowId);
}
}

private static class DictionaryTimestampInt96Accessor<
Expand Down Expand Up @@ -815,6 +826,13 @@ default Utf8StringT ofRow(FixedSizeBinaryVector vector, int rowId) {
getGenericClass().getSimpleName()));
}

/** Create a UTF8 String from the row value in the Dictionary. */
default Utf8StringT ofRow(IntVector offsetVector, Dictionary dictionary, int rowId) {
throw new UnsupportedOperationException(
String.format(
"Creating %s from a Dictionary is not supported", getGenericClass().getSimpleName()));
}

/** Create a UTF8 String from the byte array. */
Utf8StringT ofBytes(byte[] bytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.nio.ByteBuffer;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.Dictionary;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
Expand Down Expand Up @@ -81,6 +83,12 @@ public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) {
return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString());
}

@Override
public UTF8String ofRow(IntVector offsetVector, Dictionary dictionary, int rowId) {
byte[] bytes = dictionary.decodeToBinary(offsetVector.get(rowId)).getBytes();
return UTF8String.fromString(UUIDUtil.convert(bytes).toString());
}

@Override
public UTF8String ofBytes(byte[] bytes) {
return UTF8String.fromBytes(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,19 @@ public void testUnsupportedReadsForParquetV2() throws Exception {
.hasMessageStartingWith("Cannot support vectorized reads for column")
.hasMessageEndingWith("Disable vectorized reads to read this table/file");
}

@Test
public void testUuidReads() throws Exception {
// Just one row to maintain dictionary encoding
int numRows = 1;
Schema schema = new Schema(optional(100, "uuid", Types.UUIDType.get()));

File dataFile = File.createTempFile("junit", null, temp.toFile());
assertThat(dataFile.delete()).as("Delete should succeed").isTrue();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this? Test looks fine just wondering why we create and then delete

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple tests that use the same directory. Removing this line causes:

> Task :iceberg-spark:iceberg-spark-3.5_2.12:test FAILED
TestParquetVectorizedReads > testUuidReads() FAILED
    org.apache.iceberg.exceptions.AlreadyExistsException: File already exists: /var/folders/q_/gj3w0fts15n9l1dz58630jg40000gp/T/junit-17282529623949094801/junit6226812078387717274.tmp
        at app//org.apache.iceberg.Files$LocalOutputFile.create(Files.java:56)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created an issue to fix this for all the tests: #13506

Copy link
Copy Markdown
Contributor

@liamzwbao liamzwbao Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look at the issue, @TempDir make sure each test get a unique directory so multiple tests won't affect each other. However, the reason these tests need to manually delete the directory is that @TempDir always creates the directory before the test starts, and getParquetWriter throws an exception if the target folder already exists.

What we actually need here is just a randomly generated path without creating anything at that location. Unfortunately, @TempDir doesn’t currently support that level of control.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a JUnit expert @liamzwbao, but this blog suggests that you can also recreate the directory after each test

Iterable<Record> data = generateData(schema, numRows, 0L, 0, IDENTITY);
try (FileAppender<Record> writer = getParquetV2Writer(schema, dataFile)) {
writer.addAll(data);
}
assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE);
}
}