Flink: bridge the gap btw FlinkSource and IcebergSource (FLIP-27) and…#5318
Conversation
| */ | ||
|
|
||
| package org.apache.iceberg.flink; | ||
| package org.apache.iceberg.flink.source; |
There was a problem hiding this comment.
moved this class inside the source dir so that it can use some package private methods from IcebergSource. and it also seems like a good home for this class.
| source, WatermarkStrategy.noWatermarks(), source.name(), TypeInformation.of(RowData.class)); | ||
|
|
||
| if (source.getBoundedness() == Boundedness.BOUNDED) { | ||
| int parallelism = SourceUtil.inferParallelism(readableConfig, limit, () -> source.planSplitsForBatch().size()); |
There was a problem hiding this comment.
the expensive lambda function of planSplitsForBatch will only be executed if config is enabled
| } | ||
| } | ||
|
|
||
| int inferParallelism(FlinkInputFormat format, ScanContext context) { |
There was a problem hiding this comment.
the following two methods are refactored to a new class SourceUtil so that the FLIP-27 IcebergSource can reuse them.
| @Override | ||
| public void start() { | ||
| super.start(); | ||
| if (shouldEnumerate) { |
There was a problem hiding this comment.
split discovery for static/batch enumerator is performed in the IcebergSource line 183 now. this is to consolidate the batch planning into a single class.
| import org.apache.flink.util.CloseableIterator; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
|
|
||
| public class SqlHelpers { |
There was a problem hiding this comment.
util methods extracted from TestFlinkScanSql
| } | ||
|
|
||
| @Before | ||
| public void before() throws IOException { |
There was a problem hiding this comment.
this is refactored to HadoopCatalogResource to be reusable (e.g. by the new TestSqlBase)
| } | ||
|
|
||
| @Test | ||
| public void testResiduals() throws Exception { |
There was a problem hiding this comment.
remaining test methods are refactored into TestSqlBase to share the test btw current and new FLIP-27 sources
| */ | ||
|
|
||
| package org.apache.iceberg.flink; | ||
| package org.apache.iceberg.flink.source; |
There was a problem hiding this comment.
moved to source dir to be consistent with the move of IcebergTableSource class
rdblue
left a comment
There was a problem hiding this comment.
Looks good! Let me know whether we need the infer parallelism changes and I'll merge.
|
@zhangjun0x01 @openinx @yittg like to get your input on the inferring parallelism feature. The current implementation in For now, I am going to exclude it in this PR. If we decide that we should carry it over, I can follow up with a separate PR. |
| this.scanContext = scanContext; | ||
| this.readerFunction = readerFunction; | ||
| this.assignerFactory = assignerFactory; | ||
| this.table = table; |
There was a problem hiding this comment.
I thought we were going to avoid these changes for now, since we don't know whether they will be needed to infer parallelism?
There was a problem hiding this comment.
We are avoiding the feature of inferring parallelism. But I think this refactoring is still good.
- It avoid double loading of the table. a
Tableis loaded in the builder to get fields likeschema,io,encryptionetc. It will be loaded again in theIcebergSource#createEnumeratormethod, which also runs in the jobmanager/driver. table/lazyTable()is used by thename()getter.
|
@stevenzwu, can you rebase? |
… added an opt-in config to use FLIP-27 source in Flink SQL
|
Looks good to me! I'll merge when tests are passing. |
…tion order on expected vs actual
|
@rdblue this is ready to be merged. I reviewed the diff again and looks good after rebase. Previous two CI runs failed due to some flakiness caused by congestion of CI machines (probably overwhelmed by a lot of PRs rebased after the big-bang spotlessApply commit). Once it failed in the |
… added an opt-in config to use FLIP-27 source in Flink SQL