Flink: Add support for Flink 2.0#12527
Conversation
|
CC @pvary @stevenzwu |
|
@mxm: We did it a bit differently in the past. See: #10881 This would mean here to move Flink 1.20 to Flink 2.0, and then in another PR copy back the original Flink 1.20 code. The result is that the history of the main Flink branch is easier to read. |
| @Override | ||
| protected void setup( | ||
| StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) { | ||
| super.setup(containingTask, config, output); | ||
| } | ||
|
|
There was a problem hiding this comment.
Why do we need this override?
There was a problem hiding this comment.
I forgot to add a comment here, but the reason is the same as in https://github.com/apache/iceberg/pull/12527/files/7eecf7fa428efbc7dd958d47e8cd74657d722541..35078bdc25647849afb06c58cdfbc5dda0c61830#diff-65fd76701c7646960de606de2cfe6ab6ba5f2eceb0ac66ca25ad321180127998R96
Changed.
There was a problem hiding this comment.
The reason is that the factory calls the setup method and the visibility has changed which requires an override here.
There was a problem hiding this comment.
you are saying AbstractStreamOperator changed the scope of this method from public to protected. can you share which operator factory still calls this method?
If I understand this correctly, it doesn't seem to be a right change from Flink side.
There was a problem hiding this comment.
It is called from here:
There was a problem hiding this comment.
that was StreamingReadOperator. this is IcebergFilesCommitter. don't quite see the connection here.
There was a problem hiding this comment.
True, that was an example for a different operator. Generally, calling setup is required for all operators to initialize the task, its config, and its output. Flink moved away from an explicit call to calling this in the constructor of the base class: https://github.com/apache/flink/blob/b0607a15e62b664d15efbda0b0e991f72e45a467/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L166 We are not calling the super constructor, so we need to manually call the setup method.
Looking at the code base, Flink 2.0 seems to favor creating a StreamOperatorFactory. I've updated the code to use a factory and call the super constructor instead. The setup method overrides are now removed.
|
I think it would be good to decide on the general question:
While it is good to test out things, I'm not entirely sure we want to run forward supporting a preview version. Others from the community could have a different opinion, but it might be good to ask the users on the dev list |
|
|
||
| /** | ||
| * Override Flink's internal FlinkScalaKryoInstantiator to avoid loading the Scala extensions for | ||
| * the KryoSerializer. This is a workaround until Kryo-related issues with the Scala extensions are |
There was a problem hiding this comment.
is there any Flink jira we can link here for the follow-up?
There was a problem hiding this comment.
Yes, there is this ticket: https://issues.apache.org/jira/browse/FLINK-37546
I've added it to the comment.
| @Override | ||
| protected void setup( | ||
| StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) { | ||
| super.setup(containingTask, config, output); | ||
| } | ||
|
|
There was a problem hiding this comment.
that was StreamingReadOperator. this is IcebergFilesCommitter. don't quite see the connection here.
|
@stevenzwu Thanks for the review! I squashed the commits. |
|
(I renamed the Flink 2.0 commit to "Flink: Add support for Flink 2.0") |
|
thanks @mxm for the contribution and @pvary @ajantha-bhat for the reviews |
|
Is there a fourth commit that removed 1.18 support? |
|
@manuzhang Yes, there is. |
|
Thanks for reviewing / merging @stevenzwu. |
|
when is the 2.0 release coming? |
|
@le5hy it will be released in 1.10.0 which is targeted for end of June if not postponed. |
This adds support for running Iceberg with Flink 2.0. No major changes, but there are several interfaces removed or moved to legacy packages. This posed the question whether to remove some older implementations of these interfaces, e.g. FlinkSource and FlinkSink, and replace them with newer ones, e.g. IcebergSource and IcebergSink.
After a bit of investigation and further discussions on the mailing list, we decided to leverage the legacy packages and keep support for FlinkSource and FlinkSink. There is an ongoing discussion when to remove them.
To ease the review, it is broken down into several commits: