Skip to content

Flink - Fix incorrect / old row being written into delta files when using upsert mode#4364

Merged
rdblue merged 8 commits into
apache:masterfrom
kbendick:fix-flink-upsert-delta-file-writer
Mar 28, 2022
Merged

Flink - Fix incorrect / old row being written into delta files when using upsert mode#4364
rdblue merged 8 commits into
apache:masterfrom
kbendick:fix-flink-upsert-delta-file-writer

Conversation

@kbendick
Copy link
Copy Markdown
Contributor

@kbendick kbendick commented Mar 18, 2022

Presently, we are writing incorrect data into equality delete file manifests when using upsert mode with Flink.

The rows come in as INSERT row kind, but presently we are calling writer.delete on the entire row, which is the new row.

This leads to incorrect data being written into the equality delete manifests.

Instead, we need to be writing data using only the equality field id keys in that case, so that the delete will apply to the old row and not the new entire row.

Co-authored-by: Li Lliwei [email protected]

Opening this PR, as it addresses some of the feedback from #4316, as well as only applies to Flink 1.14. As @hililiwei and I worked on this together all week, I've marked us as co-authors. We can close that PR as this one is only oriented towards Flink 1.14 (and has the correct co-authorship in it so it can be merged at anyone's discretion).

As some code in core has changed, some changes need to be applied in earlier versions of Flink. To keep the PR to the minimum required, I'm leaving that out where possible and we can add it in after.

The original reporting issue is thanks to @xloya in #4311

@kbendick kbendick changed the title Flink - Fix incorrect row being written for delta files when using upsert mode Flink - Fix incorrect / old row being written into delta files when using upsert mode Mar 18, 2022
Comment thread flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java Outdated
@hililiwei
Copy link
Copy Markdown
Contributor

For more information about the reproduction, see :#4316 (comment)

Comment thread flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java Outdated
@kbendick kbendick marked this pull request as draft March 20, 2022 22:29
@kbendick kbendick marked this pull request as ready for review March 20, 2022 22:29
@kbendick kbendick changed the title Flink - Fix incorrect / old row being written into delta files when using upsert mode Flink - Fix incorrect / old row being written into delta files when using upsert mode - DO NOT MERGE Mar 20, 2022
Comment thread build.gradle
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch 2 times, most recently from 4c947eb to 4ad7a6e Compare March 21, 2022 01:56
@kbendick kbendick changed the title Flink - Fix incorrect / old row being written into delta files when using upsert mode - DO NOT MERGE Flink - Fix incorrect / old row being written into delta files when using upsert mode Mar 21, 2022
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch from 1b82e76 to 65b493b Compare March 21, 2022 02:08
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
// TODO provide the ability to customize the equality-delete row schema.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems worth removing, as not having the right equality-delete row schema can cause correctness issues.

@kbendick kbendick closed this Mar 21, 2022
@kbendick kbendick reopened this Mar 21, 2022
@kbendick
Copy link
Copy Markdown
Contributor Author

It seems that one of the new tests might be flakey.

org.apache.iceberg.flink.TestFlinkUpsert > testCompoundPrimaryKey[catalogName=testhadoop_basenamespace, baseNamespace=l0.l1, format=ORC, isStreaming=true] FAILED
[215](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:215)
    java.lang.AssertionError: result should have the correct rows expected:<[+I[2, aaa, 2022-03-01], +I[3, bbb, 2022-03-01]]> but was:<[+I[1, aaa, 2022-03-01], +I[3, bbb, 2022-03-01]]>
[216](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:216)
        at org.junit.Assert.fail(Assert.java:89)
[217](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:217)
        at org.junit.Assert.failNotEquals(Assert.java:835)
[218](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:218)
        at org.junit.Assert.assertEquals(Assert.java:120)
[219](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:219)
        at org.apache.iceberg.flink.TestFlinkUpsert.testCompoundPrimaryKey(TestFlinkUpsert.java:218)

Comment thread core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Comment thread core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java Outdated
Comment thread flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java Outdated
Comment thread flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java Outdated
Comment thread flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java Outdated
Copy link
Copy Markdown
Contributor Author

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed another test that also fails.

I do not think that this new test, testMultipleUpsertsToOneRowWithNonPKFieldChanging, has much of anything to do with the changes made in this PR.

However, given that we've been working on this for a while and also that I've been sick for the past few days, I wanted to get this in front of others to see. I do think it should be handled separately though.


TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false)));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record for PK ("aaa", '2022-03-01) should be whats mentioned here, but instead it comes out with the boolean field as true instead.

Comment on lines +289 to +326
public void testMultipleUpsertsToOneRowWithNonPKFieldChanging() {
String tableName = "test_multiple_upserts_on_one_row";
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT NOT NULL, bool BOOLEAN NOT NULL, " +
"PRIMARY KEY(data,dt) NOT ENFORCED) " +
"PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableUpsertProps));

sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 1, false)," +
"('aaa', TO_DATE('2022-03-01'), 2, false)," +
"('bbb', TO_DATE('2022-03-01'), 3, false)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 2, false), Row.of("bbb", dt, 3, false)));

// Process several duplicates of the same record with PK ('aaa', TO_DATE('2022-03-01')).
// Depending on the number of times that records are inserted for that row, one of the
// rows 2 back will be used instead.
//
// Indicating possibly an issue with insertedRowMap checking and/or the positional delete
// writer.
sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, true)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other test that I have found that fails.

It fails on master as well, and I don't think it's caused by this change, but given that we've spent a while on this PR and this data correctness bug seems somewhat urgent and this issue is in the same area, I wanted to get it out there for others to see.

This can ideally be handled in a follow up PR, as I believe it's related to the positional delete writer or the insertedRowMap. See the TODO in BaseTaskWriter#internalPosDelete, which seems to allude to this issue:

TODO attach the previous row if has a positional-delete row schema in appender factory

I would prefer to open a separate PR for this issue, but given that it's an issue involving the same class being fixed here, I wanted to get this out there for others to look at ASAP. cc @openinx

Comment thread data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java Outdated
Comment on lines +314 to +319
sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, true)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)",
tableName);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More update: When changing this to the following insertion statement, the tests pass with the correct results:

      sql("INSERT INTO %s VALUES " +
          "('aaa', TO_DATE('2022-03-01'), 1, false)," +
          "('aaa', TO_DATE('2022-03-01'), 2, true)," +
          "('aaa', TO_DATE('2022-03-01'), 3, false)," +
          "('aaa', TO_DATE('2022-03-01'), 6, false)",
          tableName);

So I'm not sure what this issue is, but I'm now very sure it's not related to this change at all.

@kbendick
Copy link
Copy Markdown
Contributor Author

Given that the new test case testMultipleUpsertsToOneRowWithNonPKFieldChanging can be fixed by using new values for each row as shown here, #4364 (comment), I'm now sure that the issue is unrelated to what this patch is fixing.

And I would suggest that we handle it in another PR. As the issue exists in master, and is not related to the changes this PR fixes.

I've removed the test and will open another issue and a draft PR with the new test case in it so that anybody who wants to can look into it further. 🙂

@rdblue rdblue added this to the Iceberg 0.13.2 Release milestone Mar 28, 2022
@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Mar 28, 2022

Great, with the unnecessary test projection removed, I think this is ready. It also looks like other discussion has been resolved, so I'll merge this.

For the new test failure, I talked with Kyle offline and we'll resolve it in a separate PR since it appears to be unrelated to this problem---it is position deletes that are the issue.

@rdblue rdblue merged commit 340a0c5 into apache:master Mar 28, 2022
@kbendick kbendick deleted the fix-flink-upsert-delta-file-writer branch April 13, 2022 16:34
nastra pushed a commit to nastra/iceberg that referenced this pull request May 16, 2022
nastra pushed a commit to nastra/iceberg that referenced this pull request May 17, 2022
nastra pushed a commit to nastra/iceberg that referenced this pull request May 18, 2022
nastra pushed a commit to nastra/iceberg that referenced this pull request May 18, 2022
hililiwei added a commit to hililiwei/iceberg that referenced this pull request Jun 29, 2022
Co-authored-by: liliwei <[email protected]>
Reference:(cherry picked from commit 340a0c5)
wobu added a commit to TIKI-Institut/kafka-connect-iceberg-sink that referenced this pull request Nov 24, 2022
- Removed dependency on debezium cdc fields, only depends on existing primary key
- Changed to `deleteKey()`. As in apache/iceberg#4364
- Added partitionField Ids to equality delete schema, as mentioned at https://iceberg.apache.org/docs/latest/flink/#upsert
t3hw added a commit to nanit/iceberg that referenced this pull request Mar 15, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
t3hw added a commit to nanit/iceberg that referenced this pull request Mar 15, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
t3hw added a commit to t3hw/iceberg that referenced this pull request Mar 16, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
t3hw added a commit to nanit/iceberg that referenced this pull request Mar 16, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants