Skip to content

Flink: Add streaming upsert write option.#2863

Merged
openinx merged 13 commits into
apache:masterfrom
Reo-LEI:flink-upsert
Sep 13, 2021
Merged

Flink: Add streaming upsert write option.#2863
openinx merged 13 commits into
apache:masterfrom
Reo-LEI:flink-upsert

Conversation

@Reo-LEI
Copy link
Copy Markdown
Contributor

@Reo-LEI Reo-LEI commented Jul 25, 2021

fork from #1996

author: @openinx

case INSERT:
if (upsert) {
writer.delete(row);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could only delete row on INSERT. I don't think there will be only have UPDATE_AFTER row and lost UPDATE_BEFORE situation. @openinx please check this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an update operation in flink, the UPDATE_AFTER event will must be emitted to the downstream, while the UPDATE_BEFORE is a best effort behavior or an configured behavior from the upstream flink source. You can take a look at this GroupAggFunction, if the flink source is configured to produce UPDATE_AFTER only, then it won't emit any UPDATE_BEFORE to the downstream.

For the downstream iceberg sink, we need to handle all the UPDATE_AFTER as UPSERT. That also means we need to do nothing for the UPDATE_BEFORE because we will remove the previous key in the next UPDATE_AFTER events.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can only delete row on UPDATE_AFTER and keep UPDATE_BEFORE do nothing to prevent delete one row twice

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By name, I thought INSERT is "add a new row". Then we don't need to add a delete for it. But I guess it actually means "append a row (new or updated)".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @openinx mentioned in #1996 (comment), we need to transform INSERT/UPDATE_AFTER to be UPSERT(delete + insert). If we don't add a delete on INSERT row when upsert mode is enable, we will get duplicate rows for same primary key.

Comment thread flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java Outdated
@Reo-LEI
Copy link
Copy Markdown
Contributor Author

Reo-LEI commented Jul 25, 2021

I pick #1996 up on this PR, could you take a look? @rdblue @openinx

Comment thread flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Comment thread flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java Outdated
@coolderli
Copy link
Copy Markdown
Contributor

@Reo-LEI any update?

@github-actions github-actions Bot added the core label Jul 28, 2021
@Reo-LEI
Copy link
Copy Markdown
Contributor Author

Reo-LEI commented Jul 28, 2021

@Reo-LEI any update?

Yep, I will keep push this PR. But I only have time on night, so the progress will be slow.

@Reo-LEI
Copy link
Copy Markdown
Contributor Author

Reo-LEI commented Jul 31, 2021

ping @openinx @rdblue

# Conflicts:
#	flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
#	flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
#	flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
#	flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
#	flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@Reo-LEI
Copy link
Copy Markdown
Contributor Author

Reo-LEI commented Aug 23, 2021

This PR have been open for some time, I think this feature is very importent for user, and just waiting someone to review and merge that. Has anybody to take a look of this? @rdblue @openinx @aokolnychyi @stevenzwu @kbendick

Comment thread flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java Outdated
Comment thread core/src/main/java/org/apache/iceberg/TableProperties.java Outdated
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
if (!table.spec().isUnpartitioned()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my own learning, does partition field must be included in equality fields?

e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the

As @openinx comment as above, we shoule restrict the partition fields is a subset of equality fields to ensure we can delete the old data in same partition.

e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?

I think that is not a valid scenario, to keep user_id unique in all different hour parition is make no sense.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a table with user_id and hour, the business primary key is user_id, which mean the table should have at most one row for each given user_id. Now let's take about the partition strategy.

If we just partition the table by hour field, that means two different hour partitions may have the same user_id, because people may insert the user_id in hour=01 and hour=02. If we wanna to keep the primary key semantics, then we will need to delete the old user_id in the hour=01 first, then insert the new user_id in the hour=02. But when an INSERT come, we don't know which partition has the specific user_id, then we have to broadcast the DELETE to all the partitions, which is quite inefficient.

@Reo-LEI
Copy link
Copy Markdown
Contributor Author

Reo-LEI commented Sep 5, 2021

I have fixed the unittest, could you retry the ci? :) @openinx

@Reo-LEI Reo-LEI requested a review from openinx September 5, 2021 06:13
@openinx
Copy link
Copy Markdown
Member

openinx commented Sep 8, 2021

Retry to run CI ...

@openinx openinx closed this Sep 8, 2021
@openinx openinx reopened this Sep 8, 2021
@openinx openinx changed the title Flink: Transform INSERT as one DELETE following one INSERT Flink: Add streaming upsert write option. Sep 8, 2021
@openinx
Copy link
Copy Markdown
Member

openinx commented Sep 8, 2021

LGTM if the travis CI says OK ! Thanks for @Reo-LEI for picking this up !

Copy link
Copy Markdown
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions for my own understanding.

Overall this looks good to me, but I'll leave it for others until I have a chance to look more closely as Flink is an area I'm somewhat less specialized in. Thanks for working on this @Reo-LEI and @openinx!

Comment on lines +226 to +227
public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions, one that's somewhat unrelated:

  1. Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
  2. (Somewhat unrelated / thinking out loud) If we have this new write.upsert.enabled flag, could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used in streaming mode now? Or does this work with Flink batch sink as well?

Yes, it's only used for streaming mode right now. The batch upsert semantic has been implemented correctly by the MERGE INTO clause.

could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?

In theory, it's possible to add the CDC support for spark sturctured streaming, though the spark structured streaming does not support CDC event natively (I mean flink support INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER events natively while Spark streaming doesn't unless we add extra field to indicate what's the operation type it is). I think @XuQianJin-Stars @chenjunjiedada 's team are working on this issue in their own repo.

Comment on lines +83 to +85
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking question:

Are there possible concerns with events coming out of order for some reason? I guess since the table commits are serializable, this isn't a concern as the same row for these equality fields shouldn't be updated twice in the same commit?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question, @kbendick ! Let's describe the out-of-order in two dimension:

  1. Is possible to produce disordered events in a single iceberg transaction ? First of all, if we want to maintain the correct data semantics between the source table and iceberg sink table, the records consumed from source table must be the correct order. Second, the streaming job will need to shuffle based on the equality fields so that the records with same key are dispatched to the specialized parallelism task, otherwise the out-of-order issue happen if different tasks write the records with same equality fields to the iceberg table. In this way, the order in a single transaction is guaranteed.

  2. The out-of-order issue between two continues transaction. In our flink stream integration, we have guaranteed the exact commit order even if a failover happen. For the spark streaming, I think we will need more consideration to this issue.

Hopefully, I've answered your question, @kbendick :-)

Comment on lines +348 to +350
if (upsertMode) {
Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that verifies the builder doesn't allow overwrite and upsert / upsertMode?

Maybe I missed it, but seems like an important thing to have a test for in case of future code refractors.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed !

Copy link
Copy Markdown
Contributor Author

@Reo-LEI Reo-LEI Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a unittest to cover this, thanks for your reminder!:) @kbendick

@Reo-LEI
Copy link
Copy Markdown
Contributor Author

Reo-LEI commented Sep 12, 2021

@openinx @stevenzwu @kbendick do you have another other concerns for this PR? I think this feature is very important for flink user, we should merge this PR as soon as possible. :)

@openinx
Copy link
Copy Markdown
Member

openinx commented Sep 13, 2021

@Reo-LEI , The PR looks good to me now, thanks for the patient contribution, thanks all for reviewing (@kbendick & @stevenzwu ) . I will get this merged once the travis CI says okay !

@openinx openinx closed this Sep 13, 2021
@openinx openinx reopened this Sep 13, 2021
@openinx
Copy link
Copy Markdown
Member

openinx commented Sep 13, 2021

Reopened PR to retry the CI.

@openinx openinx merged commit 3763952 into apache:master Sep 13, 2021
@Reo-LEI Reo-LEI deleted the flink-upsert branch September 13, 2021 14:58
izchen pushed a commit to izchen/iceberg that referenced this pull request Dec 7, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Dec 17, 2021
@hameizi hameizi mentioned this pull request Feb 10, 2022
t3hw added a commit to nanit/iceberg that referenced this pull request Mar 15, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
t3hw added a commit to nanit/iceberg that referenced this pull request Mar 15, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
t3hw added a commit to t3hw/iceberg that referenced this pull request Mar 16, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
t3hw added a commit to nanit/iceberg that referenced this pull request Mar 16, 2026
cross-batch duplicates

In upsert mode, INSERT did not call `deleteKey()` before `write()`,
unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined
in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE
for the same key arrived in the same batch, INSERT's `write()`
populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the
position-delete path instead of emitting the equality delete needed to
remove the prior batch's row.

The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for
INSERT when upsert is enabled - matching Flink's behavior. This
produces an equality delete for every INSERT in upsert mode, even for
genuinely new keys (no-op at read time, resolved by compaction). This
is the same trade-off Flink makes.

Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`)
that commit to real Iceberg tables and read back row data, following
the Flink `TestDeltaTaskWriter` pattern. These tests verify that
equality deletes actually mask prior-batch rows - something the
existing mock-based unit tests could not validate.

Also updates documentation with a delete behavior matrix, exactly-once
semantics explanation, and operational guidance for CDC mode.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants