Skip to content

Flink: Migrate Flink TableSchema to Schema/ResolvedSchema#13072

Merged
pvary merged 23 commits into
apache:mainfrom
liamzwbao:issue-13054-tableschema-deprecations-iceberg-source
Jun 25, 2025
Merged

Flink: Migrate Flink TableSchema to Schema/ResolvedSchema#13072
pvary merged 23 commits into
apache:mainfrom
liamzwbao:issue-13054-tableschema-deprecations-iceberg-source

Conversation

@liamzwbao
Copy link
Copy Markdown
Contributor

@liamzwbao liamzwbao commented May 15, 2025

Summary

This is part of #13054.

To deprecate FlinkSchemaUtil.convert(TableSchema), we also need to deprecate its dependent methods and remove the use of TableSchema. This PR contributes to that goal by removing TableSchema usage from Flink 2.0 and Flink 1.20.

@github-actions github-actions Bot added the flink label May 15, 2025
@liamzwbao liamzwbao force-pushed the issue-13054-tableschema-deprecations-iceberg-source branch from c73881f to e0f7eea Compare May 18, 2025 00:15
@liamzwbao liamzwbao marked this pull request as ready for review May 18, 2025 00:23
@liamzwbao
Copy link
Copy Markdown
Contributor Author

Hi @ajantha-bhat @nastra. PTAL, thank you!

@liamzwbao liamzwbao changed the title Migrate Flink TableSchema for IcebergSource Flink: Migrate Flink TableSchema for IcebergSource May 18, 2025
@nastra nastra requested a review from pvary May 19, 2025 06:18
@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 19, 2025

I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?

Thanks for working on this!

Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, looks good. I've left some comments regarding the deprecated methods.

@liamzwbao
Copy link
Copy Markdown
Contributor Author

I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?

Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass?

@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 21, 2025

I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?

Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass?

That's a good idea. Just please keep this PR 1.20 only, so it is easier to address the comments, and review.

Thanks,
Peter

@liamzwbao liamzwbao force-pushed the issue-13054-tableschema-deprecations-iceberg-source branch from e0f7eea to 5d2c014 Compare May 22, 2025 00:31
@liamzwbao
Copy link
Copy Markdown
Contributor Author

Made the same change for Flink 1.20, tests passed locally. Please take another look, thank you @pvary @mxm!

@ajantha-bhat ajantha-bhat mentioned this pull request May 22, 2025
3 tasks
@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 23, 2025

@liamzwbao: What is the plan with the other uses of TableSchema? Do we plan to remove those as well?

@liamzwbao
Copy link
Copy Markdown
Contributor Author

@pvary Yep, my plan is to first remove TableSchema from IcebergSource. After that, I’ll deprecate the remaining methods in FlinkSchemaUtil, followed by deprecations in FlinkSink and IcebergSink. I’ll also clean up any remaining usages of TableSchema across the Flink codebase after that.

@mxm
Copy link
Copy Markdown
Contributor

mxm commented May 28, 2025

What are the obstacles for completely getting rid of TableSchema?

@liamzwbao
Copy link
Copy Markdown
Contributor Author

What are the obstacles for completely getting rid of TableSchema?

Hi @mxm, do you mean removing all usages of TableSchema in a single PR? I think that would result in a fairly large change, which might be difficult to review. So my current plan is to break it down into smaller PRs. However, if you believe it's better to handle it all in one PR, I can give that a try.

As for the obstacles, one current discrepancy in the migration is that the legacy TableSchema.builder().build() performs some validation in the build() method. These validations are missing when directly using ResolvedSchema.of(), as they have been moved to the SchemaResolver.

So I see two possible options:

  • Add the missing validation logic into the current utility function.
  • Build a Schema first and then resolve it to ResolvedSchema. However, this approach requires a SchemaResolver, which may involve API changes. I'm not sure how we can obtain a SchemaResolver in this context.

For more discussion, please refer to this thread.

@mxm
Copy link
Copy Markdown
Contributor

mxm commented Jun 3, 2025

Thanks for elaborating! I've responded in the thread. If we can, it would be great to completely get rid of TableSchema in this PR. We can add the steps as separate commits. I leave it up to you though, if you think that is too much work.

@liamzwbao liamzwbao force-pushed the issue-13054-tableschema-deprecations-iceberg-source branch from 6d185c0 to fa88a8f Compare June 4, 2025 01:11
Comment on lines -471 to -486
TableSchema ts1 = ct1.getSchema();
TableSchema ts2 = ct2.getSchema();
boolean equalsPrimary = false;

if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
equalsPrimary =
Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType())
&& Objects.equals(
ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns());
} else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
equalsPrimary = true;
}

if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
&& Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
&& equalsPrimary)) {
Copy link
Copy Markdown
Contributor Author

@liamzwbao liamzwbao Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is already covered by equals of Schema

/**
* @deprecated Use {@link #resolvedSchema(ResolvedSchema)} instead.
*/
@Deprecated
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not add version here as it's marked as Internal

public IcebergTableSource(
TableLoader loader,
TableSchema schema,
ResolvedSchema schema,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this method directly without marking as Deprecated since it's Internal

@liamzwbao liamzwbao force-pushed the issue-13054-tableschema-deprecations-iceberg-source branch from fa88a8f to 5048493 Compare June 4, 2025 02:06
@liamzwbao
Copy link
Copy Markdown
Contributor Author

Hi @mxm and @pvary, this PR is ready for review. I’ve fully removed/deprecated the use of TableSchema in Flink 2.0/1.20. The changes are split into four commits: IcebergSource, FlinkSchemaUtil, FlinkSink/IcebergSink, and other remaining items such as FlinkCatalog. Please take another look, thanks!

@liamzwbao liamzwbao changed the title Flink: Migrate Flink TableSchema for IcebergSource Flink: Migrate Flink TableSchema to Schema/ResolvedSchema Jun 4, 2025
}

/** Clean up after removing {@link Builder#tableSchema} */
@Deprecated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When could we remove this method? 2.0?
Maybe add it to the comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add it because it's not a public method, it's good to have the info tho

…-source

# Conflicts:
#	flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java
#	flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
#	flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
#	flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java
@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jun 19, 2025

@liamzwbao: This is quite a big change, but seems like a good direction to me.
If I can have 2 requests, I would like to ask you:

  • It is good that we validated that the Flink 1.20, and the Flink 2.0 migration is working, but in the PR only change the thing for Flink 2.0. This helps for the reviewers, so we don't have to double check everything, and also helps the contributors as a requested change is needed to be modified once. Later, when the Flink 2.0 PR is merged, then in a separate PR we backport the final changes to Flink 1.20/1.19, and then we only need to highlight the changes compared to the original PR.
  • Could you please check that we keep calling the old, deprecated methods at least in a few unit tests (maybe add some new duplicated smokes tests). This is needed to ensure that the old functionality still works.

Thanks for all your effort here! I would love to merge these changes after the 2 asks above are addressed.

Thanks,
Peter

@liamzwbao liamzwbao force-pushed the issue-13054-tableschema-deprecations-iceberg-source branch from 1ef55fd to b4289b1 Compare June 19, 2025 23:19
Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minor comments, looks good to me otherwise.

Comment thread flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java Outdated
Comment on lines +667 to +703
@Test
public void testConvertFlinkSchemaWithPrimaryKeys() {
Schema icebergSchema =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "int", Types.IntegerType.get()),
Types.NestedField.required(2, "string", Types.StringType.get())),
Sets.newHashSet(1, 2));

ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(icebergSchema);
assertThat(resolvedSchema.getPrimaryKey())
.isPresent()
.get()
.satisfies(k -> assertThat(k.getColumns()).containsExactly("int", "string"));
}

@Test
public void testConvertFlinkSchemaWithNestedColumnInPrimaryKeys() {
Schema icebergSchema =
new Schema(
Lists.newArrayList(
Types.NestedField.required(
1,
"struct",
Types.StructType.of(
Types.NestedField.required(2, "inner", Types.IntegerType.get())))),
Sets.newHashSet(2));

assertThatThrownBy(() -> FlinkSchemaUtil.toResolvedSchema(icebergSchema))
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith("Invalid primary key")
.hasMessageContaining("Column 'struct.inner' does not exist.");
}

@Test
public void testConvertFlinkTableSchemaBaseOnIcebergSchema() {
Schema baseSchema =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test only change one method call. We could use TestTemplate. On the other hand, removing the old tests will be easier once We remove TableSchema.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback! It's much cleaner now

@liamzwbao liamzwbao force-pushed the issue-13054-tableschema-deprecations-iceberg-source branch from fdce728 to 4e37f51 Compare June 20, 2025 23:44
Copy link
Copy Markdown
Contributor Author

@liamzwbao liamzwbao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pvary, thanks for your advice! I have added necessary tests as requests. I didn't add tests for IcebergSink as it's experimental and probably okay to only cover the new Schema/ResolvedSchema.

@@ -122,7 +122,7 @@ public void before() throws IOException {

@TestTemplate
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not add TableSchema tests for V2 as it is experimental

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are about to remove the experimental status and deprecate the old sink. It would be nice to add a basic test here as well.

Comment on lines +667 to +703
@Test
public void testConvertFlinkSchemaWithPrimaryKeys() {
Schema icebergSchema =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "int", Types.IntegerType.get()),
Types.NestedField.required(2, "string", Types.StringType.get())),
Sets.newHashSet(1, 2));

ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(icebergSchema);
assertThat(resolvedSchema.getPrimaryKey())
.isPresent()
.get()
.satisfies(k -> assertThat(k.getColumns()).containsExactly("int", "string"));
}

@Test
public void testConvertFlinkSchemaWithNestedColumnInPrimaryKeys() {
Schema icebergSchema =
new Schema(
Lists.newArrayList(
Types.NestedField.required(
1,
"struct",
Types.StructType.of(
Types.NestedField.required(2, "inner", Types.IntegerType.get())))),
Sets.newHashSet(2));

assertThatThrownBy(() -> FlinkSchemaUtil.toResolvedSchema(icebergSchema))
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith("Invalid primary key")
.hasMessageContaining("Column 'struct.inner' does not exist.");
}

@Test
public void testConvertFlinkTableSchemaBaseOnIcebergSchema() {
Schema baseSchema =
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback! It's much cleaner now

@pvary pvary merged commit 522de0f into apache:main Jun 25, 2025
18 checks passed
@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jun 25, 2025

Merged to main.
Thanks for the migration @liamzwbao and @mxm for the review!

@liamzwbao: Please prepare the backport PRs to Flink 1.19/1.20. The following command could help: g diff HEAD~1 HEAD flink/v2.0 |sed "s/v2.0/v1.19/g">/tmp/patch. Please leave a not on the backport PR if you need to modify anything above cleanly applying the patch.

@liamzwbao liamzwbao deleted the issue-13054-tableschema-deprecations-iceberg-source branch June 25, 2025 22:21
liamzwbao added a commit to liamzwbao/iceberg that referenced this pull request Jun 26, 2025
liamzwbao added a commit to liamzwbao/iceberg that referenced this pull request Jun 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants