Spark: Read/Write UnknownType#13445
Conversation
|
nit: lets make sure this PR applied to both spark 3.5 and 4.0! context: https://lists.apache.org/thread/7x7xcm3y87y81c4grq4nn9gdjd4jm05f |
Let's do this first in Spark 3.5, and I'll forwardport it to 4.0 in another PR. There will probably be some comments on the code, and then I have to keep everything in sync (something I'm not good at :). |
stevenzwu
left a comment
There was a problem hiding this comment.
only got time to look at a few classes. will spend more time next Monday.
This reverts commit c0172a1.
| if (null == struct) { | ||
| return IntStream.rangeClosed(0, numWriters).toArray(); | ||
| } |
There was a problem hiding this comment.
rangeClosed is inclusive on both ends, the resulting array will be of size numWriters + 1, is that right?
There was a problem hiding this comment.
i see this is used in a couple of other places as well
https://grep.app/search?f.repo=apache%2Ficeberg&q=IntStream.rangeClosed%280%2C+numWriters%29.toArray%28%29%3B
stevenzwu
left a comment
There was a problem hiding this comment.
the new approach suggested by Ryan looks pretty good
| /** Returns a mapping from writer index to field index, skipping Unknown columns. */ | ||
| private static int[] writerToFieldIndex(List<DataType> types, int numWriters) { | ||
| if (null == types) { | ||
| return IntStream.rangeClosed(0, numWriters).toArray(); |
There was a problem hiding this comment.
should it be range or rangeClosed? I though the end should be exclusive as [0, numWriters) .
There was a problem hiding this comment.
also if the types list/array is null, should we fail? is it a valid scenario?
I would imagine it can be empty. even that is a bit weird to me.
There was a problem hiding this comment.
should it be
rangeorrangeClosed? I though the end should be exclusive as[0, numWriters).
I think it should be range, but I'd rather follow up on that in a separate PR since there are also similar occurrences in other places.
also if the types
list/arrayis null, should we fail? is it a valid scenario?
I don't think it can be null, but I think we should fail on null == types. To illustrate, this PR addresses Spark, but the Flink writers don't pass in the struct, failing on UnknownTypes. I would also suggest this to be a follow up PR.
I would imagine it can be empty. even that is a bit weird to me.
I think that's the case of the empty struct, which is not allowed in Parquet
There was a problem hiding this comment.
sounds good to follow up separately
There was a problem hiding this comment.
created tracking issue for range vs rangeClosed
#13921
| sField.name()); | ||
| results.add(visitField(sField, field, visitor)); | ||
| for (StructField sField : struct.fields()) { | ||
| String fieldName = AvroSchemaUtil.makeCompatibleName(sField.name()); |
There was a problem hiding this comment.
@Fokko @rdblue will using field name be a problem if the field is renamed in Iceberg table? would the mapping be messed up during read or write?
During write, if both Spark type and Parquet type are converted from Iceberg schema, this should be fine. But if we are reading a Parquet file with old field name, will this break?
There was a problem hiding this comment.
Got the answer from Fokko offline. "he ReadBuilder does not use the visitor, and follows a different path which uses projection by ID"
There was a problem hiding this comment.
Yes, that's correct. The read path doesn't use this.
Map Iceberg's UnknownType to Spark's NullType in both directions (TypeToSparkType: UNKNOWN -> NullType; SparkTypeToType: NullType -> UnknownType). Filter NullType-backed fields from Parquet/ORC writers so UnknownType columns produce only nulls. Aligns Spark 3.x with the existing Spark 4.x behavior from apache#13445. Tests added (mirrored from apache#13445 to v3.4 and v3.5): - AvroDataTestBase: unk field in SUPPORTED_PRIMITIVES, plus testUnknownNestedLevel, testUnknownListType, testUnknownMapType - TestSparkOrcReader: testUnknownListType, testUnknownMapType overrides - TestSparkParquetReader: testUnknownListType, testUnknownMapType overrides - TestSparkRecordOrcReaderWriter: testUnknownListType, testUnknownMapType overrides - TestORCDataFrameWrite: testUnknownListType, testUnknownMapType overrides - TestParquetDataFrameWrite: testUnknownListType, testUnknownMapType overrides - TestParquetScan: testUnknownListType, testUnknownMapType overrides - ScanTestBase.writeAndValidate: opt into format version 3 when the schema contains UnknownType
cc @ala
Related dev-thread: https://lists.apache.org/thread/gq9lyndb574ptq7vkz83zgkp1lx7vp5x