Flink 1.20: Support Avro and Parquet timestamp(9), unknown, and defaults#12470
Conversation
|
I'll also follow up with a PR for Parquet readers, but that depends on changes in #12463. |
5f505e7 to
0af3f01
Compare
|
#12463 was merged and the changes for Parquet were small, so I included them here. |
| } else { | ||
| return Optional.of(new MicrosToTimestampReader(desc)); | ||
| } | ||
| return Optional.of(new MicrosToTimestampReader(desc)); |
There was a problem hiding this comment.
Previously, the readers were converting values to LocalDateTime or OffsetDateTime and then Flink would convert those values back to a (millis, nanosOfMilli) pair. This involved a lot of unnecessary date/time logic in both Iceberg and Flink as well as readers to produce the separate types.
Now, the conversion to Flink is direct and doesn't go through Java date/time classes. That avoids all time zone calculations and should be quicker.
| LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); | ||
| if (annotation != null) { | ||
| Optional<ParquetValueWriter<?>> writer = | ||
| annotation.accept(new LogicalTypeWriterBuilder(fType, desc)); |
There was a problem hiding this comment.
Updated this to use the logical annotation visitor.
| } | ||
| pos += 1; | ||
| } | ||
|
|
There was a problem hiding this comment.
nit: this is a bit strange. Could we fix the empty lines?
There was a problem hiding this comment.
For dense control flow blocks, we often leave extra newlines to make it more readable.
| public void testNumericTypes() throws IOException { | ||
| List<Record> expected = | ||
| ImmutableList.of( | ||
| recordNumType( | ||
| 2, | ||
| Integer.MAX_VALUE, | ||
| Float.MAX_VALUE, | ||
| Double.MAX_VALUE, | ||
| Long.MAX_VALUE, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 10.24d), | ||
| recordNumType( | ||
| 2, | ||
| Integer.MIN_VALUE, | ||
| Float.MIN_VALUE, | ||
| Double.MIN_VALUE, | ||
| Long.MIN_VALUE, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 1643811742000L, | ||
| 10.24d)); | ||
|
|
||
| writeAndValidate(SCHEMA_NUM_TYPE, expected); |
There was a problem hiding this comment.
This might be an interesting edge case to test in the TestData?
Min/max values for different columns? Could be especially interesting for timestamps/date etc
There was a problem hiding this comment.
These cases are tested in the random data generator, so this is duplication.
|
Thanks for reviewing, @pvary and @danielcweeks! |
This updates Flink's Avro and Parquet readers to support new timestamp(9) and unknown types.
While enabling
DataTestcases, I found thatsupportsDefaultValueswas not enabled so default value tests were not running for Avro. After I enabled those tests, I also needed to update theRowDataassertions and also convert values to match Flink's object model in the readers by callingRowDataUtil.convertConstant.