Flink: Support source watermark for flink sql windows#12191
Conversation
| } | ||
|
|
||
| protected static String toWithClause(Map<String, String> props) { | ||
| public static String toWithClause(Map<String, String> props) { |
There was a problem hiding this comment.
Why is this change? Did this become a utility method which is shared between tests? Can we find a better place for it then?
There was a problem hiding this comment.
This is currently being across Tests under org.apache.iceberg.flink. Wanted to use for test under org.apache.iceberg.flink.source. org.apache.iceberg.flink.source.SqlHelpers can be another option, where this can be done.
| .isInstanceOf(NullPointerException.class) | ||
| .hasMessage("watermark-column needs to be configured to use source watermark."); | ||
| } finally { | ||
| SqlHelpers.sql(getStreamingTableEnv(), "DROP TABLE IF EXISTS %s", flinkTable); |
There was a problem hiding this comment.
Why is this try finally? Can we just do a cleanup in an after method? That would make the tests easier to read
There was a problem hiding this comment.
Flink dynamic table is created only in specific tests in this class, so was handling here. But can do in after to improve readability.
There was a problem hiding this comment.
In TestSqlBase we have several tests where we create tables, and we expect that they are removed because of the @TempDir is removed. So we can simply just do the same here.
WDYT?
|
Nit: Renamed the PR to match the general patterns |
8255c66 to
4f9ef05
Compare
|
|
||
| @Override | ||
| public void applySourceWatermark() { | ||
| if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) { |
There was a problem hiding this comment.
Why is this not using Preconditions for the check?
There was a problem hiding this comment.
Thought it would be better to throw UnsupportedOperation Exception instead of IllegalArgument or IllegalStateException. If either of them is fine with current error message , I can move to Preconditions.
There was a problem hiding this comment.
I would suggest to move to Preconditions.checkArgument - we usually stick to that for this kind of checks
| TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); | ||
| } | ||
| } | ||
| } |
mxm
left a comment
There was a problem hiding this comment.
Thanks @swapna267! Changes look good to me.
|
Merged to main |
|
Thanks all. Will submit backport PR soon. |
Iceberg Source to support Source Watermark, so it can be used in Flink WINDOW functions. https://github.com/apache/flink/blob/release-1.18/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsSourceWatermark.java enables Flink to rely on the watermark strategy provided by the ScanTableSource itself.
Reference:
#10219
#9346
Previous discussion in PR, #12116 . Split into separate PR for easy review.