Skip to content

Flink: bridge the gap btw FlinkSource and IcebergSource (FLIP-27) and…#5318

Merged
rdblue merged 2 commits into
apache:masterfrom
stevenzwu:flip27SQL
Jul 28, 2022
Merged

Flink: bridge the gap btw FlinkSource and IcebergSource (FLIP-27) and…#5318
rdblue merged 2 commits into
apache:masterfrom
stevenzwu:flip27SQL

Conversation

@stevenzwu
Copy link
Copy Markdown
Contributor

… added an opt-in config to use FLIP-27 source in Flink SQL

*/

package org.apache.iceberg.flink;
package org.apache.iceberg.flink.source;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this class inside the source dir so that it can use some package private methods from IcebergSource. and it also seems like a good home for this class.

source, WatermarkStrategy.noWatermarks(), source.name(), TypeInformation.of(RowData.class));

if (source.getBoundedness() == Boundedness.BOUNDED) {
int parallelism = SourceUtil.inferParallelism(readableConfig, limit, () -> source.planSplitsForBatch().size());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expensive lambda function of planSplitsForBatch will only be executed if config is enabled

}
}

int inferParallelism(FlinkInputFormat format, ScanContext context) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following two methods are refactored to a new class SourceUtil so that the FLIP-27 IcebergSource can reuse them.

@Override
public void start() {
super.start();
if (shouldEnumerate) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split discovery for static/batch enumerator is performed in the IcebergSource line 183 now. this is to consolidate the batch planning into a single class.

import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class SqlHelpers {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util methods extracted from TestFlinkScanSql

}

@Before
public void before() throws IOException {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is refactored to HadoopCatalogResource to be reusable (e.g. by the new TestSqlBase)

}

@Test
public void testResiduals() throws Exception {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remaining test methods are refactored into TestSqlBase to share the test btw current and new FLIP-27 sources

*/

package org.apache.iceberg.flink;
package org.apache.iceberg.flink.source;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to source dir to be consistent with the move of IcebergTableSource class

Copy link
Copy Markdown
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Let me know whether we need the infer parallelism changes and I'll merge.

@stevenzwu
Copy link
Copy Markdown
Contributor Author

stevenzwu commented Jul 22, 2022

@zhangjun0x01 @openinx @yittg like to get your input on the inferring parallelism feature. The current implementation in FlinkSource would require two split planning (1) get the splits to derive split count (2) split planning in source. This is obviously not ideal. Hence @rdblue and I are wondering how useful is this feature? Do we need to carry it over to the FLIP-27 source? The main question is regarding the additional split planning. other parts of inferring parallelism (like limit count) should be fine.

For now, I am going to exclude it in this PR. If we decide that we should carry it over, I can follow up with a separate PR.

stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 22, 2022
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.table = table;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were going to avoid these changes for now, since we don't know whether they will be needed to infer parallelism?

Copy link
Copy Markdown
Contributor Author

@stevenzwu stevenzwu Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are avoiding the feature of inferring parallelism. But I think this refactoring is still good.

  • It avoid double loading of the table. a Table is loaded in the builder to get fields like schema, io, encryption etc. It will be loaded again in the IcebergSource#createEnumerator method, which also runs in the jobmanager/driver.
  • table/lazyTable() is used by the name() getter.

stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 27, 2022
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 27, 2022
@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Jul 27, 2022

@stevenzwu, can you rebase?

… added an opt-in config to use FLIP-27 source in Flink SQL
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 27, 2022
@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Jul 27, 2022

Looks good to me! I'll merge when tests are passing.

stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 27, 2022
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 28, 2022
@stevenzwu stevenzwu closed this Jul 28, 2022
@stevenzwu stevenzwu reopened this Jul 28, 2022
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 28, 2022
@stevenzwu
Copy link
Copy Markdown
Contributor Author

stevenzwu commented Jul 28, 2022

@rdblue this is ready to be merged. I reviewed the diff again and looks good after rebase.

Previous two CI runs failed due to some flakiness caused by congestion of CI machines (probably overwhelmed by a lot of PRs rebased after the big-bang spotlessApply commit). Once it failed in the TestS3OutputStream. Another time it failed in the FLIP-27 TestIcebergSourceFailover, which because it wasn't able to make enough progress after 2 mins of waiting.

@rdblue rdblue merged commit 23c9345 into apache:master Jul 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants