Flink: Migrate Flink TableSchema to Schema/ResolvedSchema#13072
Conversation
c73881f to
e0f7eea
Compare
|
Hi @ajantha-bhat @nastra. PTAL, thank you! |
|
I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20? Thanks for working on this! |
mxm
left a comment
There was a problem hiding this comment.
Generally, looks good. I've left some comments regarding the deprecated methods.
Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass? |
That's a good idea. Just please keep this PR 1.20 only, so it is easier to address the comments, and review. Thanks, |
e0f7eea to
5d2c014
Compare
|
@liamzwbao: What is the plan with the other uses of |
|
@pvary Yep, my plan is to first remove |
|
What are the obstacles for completely getting rid of TableSchema? |
Hi @mxm, do you mean removing all usages of As for the obstacles, one current discrepancy in the migration is that the legacy So I see two possible options:
For more discussion, please refer to this thread. |
|
Thanks for elaborating! I've responded in the thread. If we can, it would be great to completely get rid of |
6d185c0 to
fa88a8f
Compare
| TableSchema ts1 = ct1.getSchema(); | ||
| TableSchema ts2 = ct2.getSchema(); | ||
| boolean equalsPrimary = false; | ||
|
|
||
| if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { | ||
| equalsPrimary = | ||
| Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType()) | ||
| && Objects.equals( | ||
| ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns()); | ||
| } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { | ||
| equalsPrimary = true; | ||
| } | ||
|
|
||
| if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns()) | ||
| && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs()) | ||
| && equalsPrimary)) { |
There was a problem hiding this comment.
This part is already covered by equals of Schema
| /** | ||
| * @deprecated Use {@link #resolvedSchema(ResolvedSchema)} instead. | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Did not add version here as it's marked as Internal
| public IcebergTableSource( | ||
| TableLoader loader, | ||
| TableSchema schema, | ||
| ResolvedSchema schema, |
There was a problem hiding this comment.
Change this method directly without marking as Deprecated since it's Internal
fa88a8f to
5048493
Compare
| } | ||
|
|
||
| /** Clean up after removing {@link Builder#tableSchema} */ | ||
| @Deprecated |
There was a problem hiding this comment.
When could we remove this method? 2.0?
Maybe add it to the comment?
There was a problem hiding this comment.
I didn't add it because it's not a public method, it's good to have the info tho
…-source # Conflicts: # flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java # flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java # flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java # flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java
|
@liamzwbao: This is quite a big change, but seems like a good direction to me.
Thanks for all your effort here! I would love to merge these changes after the 2 asks above are addressed. Thanks, |
1ef55fd to
b4289b1
Compare
mxm
left a comment
There was a problem hiding this comment.
Just minor comments, looks good to me otherwise.
| @Test | ||
| public void testConvertFlinkSchemaWithPrimaryKeys() { | ||
| Schema icebergSchema = | ||
| new Schema( | ||
| Lists.newArrayList( | ||
| Types.NestedField.required(1, "int", Types.IntegerType.get()), | ||
| Types.NestedField.required(2, "string", Types.StringType.get())), | ||
| Sets.newHashSet(1, 2)); | ||
|
|
||
| ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(icebergSchema); | ||
| assertThat(resolvedSchema.getPrimaryKey()) | ||
| .isPresent() | ||
| .get() | ||
| .satisfies(k -> assertThat(k.getColumns()).containsExactly("int", "string")); | ||
| } | ||
|
|
||
| @Test | ||
| public void testConvertFlinkSchemaWithNestedColumnInPrimaryKeys() { | ||
| Schema icebergSchema = | ||
| new Schema( | ||
| Lists.newArrayList( | ||
| Types.NestedField.required( | ||
| 1, | ||
| "struct", | ||
| Types.StructType.of( | ||
| Types.NestedField.required(2, "inner", Types.IntegerType.get())))), | ||
| Sets.newHashSet(2)); | ||
|
|
||
| assertThatThrownBy(() -> FlinkSchemaUtil.toResolvedSchema(icebergSchema)) | ||
| .isInstanceOf(ValidationException.class) | ||
| .hasMessageStartingWith("Invalid primary key") | ||
| .hasMessageContaining("Column 'struct.inner' does not exist."); | ||
| } | ||
|
|
||
| @Test | ||
| public void testConvertFlinkTableSchemaBaseOnIcebergSchema() { | ||
| Schema baseSchema = |
There was a problem hiding this comment.
These test only change one method call. We could use TestTemplate. On the other hand, removing the old tests will be easier once We remove TableSchema.
There was a problem hiding this comment.
Thanks for your feedback! It's much cleaner now
fdce728 to
4e37f51
Compare
| @@ -122,7 +122,7 @@ public void before() throws IOException { | |||
|
|
|||
| @TestTemplate | |||
There was a problem hiding this comment.
Did not add TableSchema tests for V2 as it is experimental
There was a problem hiding this comment.
We are about to remove the experimental status and deprecate the old sink. It would be nice to add a basic test here as well.
| @Test | ||
| public void testConvertFlinkSchemaWithPrimaryKeys() { | ||
| Schema icebergSchema = | ||
| new Schema( | ||
| Lists.newArrayList( | ||
| Types.NestedField.required(1, "int", Types.IntegerType.get()), | ||
| Types.NestedField.required(2, "string", Types.StringType.get())), | ||
| Sets.newHashSet(1, 2)); | ||
|
|
||
| ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(icebergSchema); | ||
| assertThat(resolvedSchema.getPrimaryKey()) | ||
| .isPresent() | ||
| .get() | ||
| .satisfies(k -> assertThat(k.getColumns()).containsExactly("int", "string")); | ||
| } | ||
|
|
||
| @Test | ||
| public void testConvertFlinkSchemaWithNestedColumnInPrimaryKeys() { | ||
| Schema icebergSchema = | ||
| new Schema( | ||
| Lists.newArrayList( | ||
| Types.NestedField.required( | ||
| 1, | ||
| "struct", | ||
| Types.StructType.of( | ||
| Types.NestedField.required(2, "inner", Types.IntegerType.get())))), | ||
| Sets.newHashSet(2)); | ||
|
|
||
| assertThatThrownBy(() -> FlinkSchemaUtil.toResolvedSchema(icebergSchema)) | ||
| .isInstanceOf(ValidationException.class) | ||
| .hasMessageStartingWith("Invalid primary key") | ||
| .hasMessageContaining("Column 'struct.inner' does not exist."); | ||
| } | ||
|
|
||
| @Test | ||
| public void testConvertFlinkTableSchemaBaseOnIcebergSchema() { | ||
| Schema baseSchema = |
There was a problem hiding this comment.
Thanks for your feedback! It's much cleaner now
|
Merged to main. @liamzwbao: Please prepare the backport PRs to Flink 1.19/1.20. The following command could help: |
Summary
This is part of #13054.
To deprecate
FlinkSchemaUtil.convert(TableSchema), we also need to deprecate its dependent methods and remove the use ofTableSchema. This PR contributes to that goal by removingTableSchemausage from Flink 2.0 and Flink 1.20.