Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit#3817
Conversation
af027b1 to
9e02d1a
Compare
|
This looks like it is getting close. I'd like @openinx, @stevenzwu, or @kbendick to comment on how this should be configured, though. |
| } else { | ||
| contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); | ||
| } | ||
| contextBuilder.exposeLocality(localityEnabled()); |
There was a problem hiding this comment.
I think it should be possible to override exposeLocality in this builder so that you can set it differently for different sources. Keeping a boolean in this builder and passing that as an override for the environment property in localityEnabled() should work.
There was a problem hiding this comment.
Please review whether meet expectations, thx.
| } | ||
|
|
||
| public Builder exposeLocality(boolean newExposeLocality) { | ||
| contextBuilder.exposeLocality(newExposeLocality); |
There was a problem hiding this comment.
we set exposeLocality to contextBuilder in the buildFormat method. we probably don't need to do it here
| } | ||
|
|
||
| @Test | ||
| public void testExposeLocality() throws Exception { |
There was a problem hiding this comment.
It only verifies data read. this doesn't really verify locality aware assignment. Ideally, we need 2 files stored in 2 hosts with HDFS and run a cluster of TMs on those two hosts. Then we can verify the assigned files/splits are from the same host. But I am not sure if this can be done in a unit test setup.
There was a problem hiding this comment.
Yes, +1, ideally it is, but I haven't found a way to achieve it.So here only test whether it works properly when table.exec.iceberg.expose-split-locality-info is set to false.
There was a problem hiding this comment.
@hililiwei How does Flink code base test this feature?
There was a problem hiding this comment.
Seems to be tested by manually specifying the hostname. more refer: https://github.com/apache/flink/blob/master/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
Or try to test it by introducing miniDFS, but the project doesn't seem willing to introduce it.
There was a problem hiding this comment.
this is for unit test the assigner, not an e2e test of the whole thing. Except for this lack of e2e test, PR overall looks good to me. Have you tested this in a hadoop cluster setup manually?
| tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false); | ||
|
|
||
| List<Row> results = sql("select * from t"); | ||
| org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA); |
There was a problem hiding this comment.
Conflict with org.apache.iceberg.TestHelpers.
a663f13 to
0c14e19
Compare
| } else { | ||
| contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); | ||
| } | ||
| contextBuilder.exposeLocality(localityEnabled()); |
There was a problem hiding this comment.
Where is the exposeLocality variable used? If an explicit value is passed to this builder, it should be passed into localityEnabled() so that method can use the setting, but only if the underlying file system is hdfs.
There was a problem hiding this comment.
It is used here to determine whether to use flink config.
be3c954 to
52808f5
Compare
| return parallelism; | ||
| } | ||
|
|
||
| boolean localityEnabled() { |
There was a problem hiding this comment.
done. Test case was also modified.
rdblue
left a comment
There was a problem hiding this comment.
Looks mostly good. I think we just need to fix a few style issues in the localityEnabled() method.
|
Thanks, @hililiwei! |
Closes #3816