Flink: Add streaming upsert write option.#2863
Conversation
# Conflicts: # flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java # flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
| case INSERT: | ||
| if (upsert) { | ||
| writer.delete(row); | ||
| } |
There was a problem hiding this comment.
I think we could only delete row on INSERT. I don't think there will be only have UPDATE_AFTER row and lost UPDATE_BEFORE situation. @openinx please check this.
There was a problem hiding this comment.
For an update operation in flink, the UPDATE_AFTER event will must be emitted to the downstream, while the UPDATE_BEFORE is a best effort behavior or an configured behavior from the upstream flink source. You can take a look at this GroupAggFunction, if the flink source is configured to produce UPDATE_AFTER only, then it won't emit any UPDATE_BEFORE to the downstream.
For the downstream iceberg sink, we need to handle all the UPDATE_AFTER as UPSERT. That also means we need to do nothing for the UPDATE_BEFORE because we will remove the previous key in the next UPDATE_AFTER events.
There was a problem hiding this comment.
I think we can only delete row on UPDATE_AFTER and keep UPDATE_BEFORE do nothing to prevent delete one row twice
There was a problem hiding this comment.
By name, I thought INSERT is "add a new row". Then we don't need to add a delete for it. But I guess it actually means "append a row (new or updated)".
There was a problem hiding this comment.
As @openinx mentioned in #1996 (comment), we need to transform INSERT/UPDATE_AFTER to be UPSERT(delete + insert). If we don't add a delete on INSERT row when upsert mode is enable, we will get duplicate rows for same primary key.
|
@Reo-LEI any update? |
Yep, I will keep push this PR. But I only have time on night, so the progress will be slow. |
# Conflicts: # flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java # flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java # flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java # flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java # flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
|
This PR have been open for some time, I think this feature is very importent for user, and just waiting someone to review and merge that. Has anybody to take a look of this? @rdblue @openinx @aokolnychyi @stevenzwu @kbendick |
…e when upsert is enable.
| "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); | ||
| Preconditions.checkState(!equalityFieldIds.isEmpty(), | ||
| "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); | ||
| if (!table.spec().isUnpartitioned()) { |
There was a problem hiding this comment.
for my own learning, does partition field must be included in equality fields?
e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?
There was a problem hiding this comment.
As @openinx comment as above, we shoule restrict the partition fields is a subset of equality fields to ensure we can delete the old data in same partition.
e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?
I think that is not a valid scenario, to keep user_id unique in all different hour parition is make no sense.
There was a problem hiding this comment.
If we have a table with user_id and hour, the business primary key is user_id, which mean the table should have at most one row for each given user_id. Now let's take about the partition strategy.
If we just partition the table by hour field, that means two different hour partitions may have the same user_id, because people may insert the user_id in hour=01 and hour=02. If we wanna to keep the primary key semantics, then we will need to delete the old user_id in the hour=01 first, then insert the new user_id in the hour=02. But when an INSERT come, we don't know which partition has the specific user_id, then we have to broadcast the DELETE to all the partitions, which is quite inefficient.
|
I have fixed the unittest, could you retry the ci? :) @openinx |
|
Retry to run CI ... |
|
LGTM if the travis CI says OK ! Thanks for @Reo-LEI for picking this up ! |
| public static final String UPSERT_MODE_ENABLE = "write.upsert.enable"; | ||
| public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false; |
There was a problem hiding this comment.
Two questions, one that's somewhat unrelated:
- Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
- (Somewhat unrelated / thinking out loud) If we have this new
write.upsert.enabledflag, could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?
There was a problem hiding this comment.
Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
Yes, it's only used for streaming mode right now. The batch upsert semantic has been implemented correctly by the MERGE INTO clause.
could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?
In theory, it's possible to add the CDC support for spark sturctured streaming, though the spark structured streaming does not support CDC event natively (I mean flink support INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER events natively while Spark streaming doesn't unless we add extra field to indicate what's the operation type it is). I think @XuQianJin-Stars @chenjunjiedada 's team are working on this issue in their own repo.
| if (upsert) { | ||
| break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice | ||
| } |
There was a problem hiding this comment.
Non-blocking question:
Are there possible concerns with events coming out of order for some reason? I guess since the table commits are serializable, this isn't a concern as the same row for these equality fields shouldn't be updated twice in the same commit?
There was a problem hiding this comment.
It's a good question, @kbendick ! Let's describe the out-of-order in two dimension:
-
Is possible to produce disordered events in a single iceberg transaction ? First of all, if we want to maintain the correct data semantics between the source table and iceberg sink table, the records consumed from source table must be the correct order. Second, the streaming job will need to shuffle based on the equality fields so that the records with same key are dispatched to the specialized parallelism task, otherwise the out-of-order issue happen if different tasks write the records with same equality fields to the iceberg table. In this way, the order in a single transaction is guaranteed.
-
The out-of-order issue between two continues transaction. In our flink stream integration, we have guaranteed the exact commit order even if a failover happen. For the spark streaming, I think we will need more consideration to this issue.
Hopefully, I've answered your question, @kbendick :-)
| if (upsertMode) { | ||
| Preconditions.checkState(!overwrite, | ||
| "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); |
There was a problem hiding this comment.
Can you add a test that verifies the builder doesn't allow overwrite and upsert / upsertMode?
Maybe I missed it, but seems like an important thing to have a test for in case of future code refractors.
There was a problem hiding this comment.
I have added a unittest to cover this, thanks for your reminder!:) @kbendick
|
@openinx @stevenzwu @kbendick do you have another other concerns for this PR? I think this feature is very important for flink user, we should merge this PR as soon as possible. :) |
|
@Reo-LEI , The PR looks good to me now, thanks for the patient contribution, thanks all for reviewing (@kbendick & @stevenzwu ) . I will get this merged once the travis CI says okay ! |
|
Reopened PR to retry the CI. |
cross-batch duplicates In upsert mode, INSERT did not call `deleteKey()` before `write()`, unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE for the same key arrived in the same batch, INSERT's `write()` populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the position-delete path instead of emitting the equality delete needed to remove the prior batch's row. The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for INSERT when upsert is enabled - matching Flink's behavior. This produces an equality delete for every INSERT in upsert mode, even for genuinely new keys (no-op at read time, resolved by compaction). This is the same trade-off Flink makes. Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`) that commit to real Iceberg tables and read back row data, following the Flink `TestDeltaTaskWriter` pattern. These tests verify that equality deletes actually mask prior-batch rows - something the existing mock-based unit tests could not validate. Also updates documentation with a delete behavior matrix, exactly-once semantics explanation, and operational guidance for CDC mode.
cross-batch duplicates In upsert mode, INSERT did not call `deleteKey()` before `write()`, unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE for the same key arrived in the same batch, INSERT's `write()` populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the position-delete path instead of emitting the equality delete needed to remove the prior batch's row. The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for INSERT when upsert is enabled - matching Flink's behavior. This produces an equality delete for every INSERT in upsert mode, even for genuinely new keys (no-op at read time, resolved by compaction). This is the same trade-off Flink makes. Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`) that commit to real Iceberg tables and read back row data, following the Flink `TestDeltaTaskWriter` pattern. These tests verify that equality deletes actually mask prior-batch rows - something the existing mock-based unit tests could not validate. Also updates documentation with a delete behavior matrix, exactly-once semantics explanation, and operational guidance for CDC mode.
cross-batch duplicates In upsert mode, INSERT did not call `deleteKey()` before `write()`, unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE for the same key arrived in the same batch, INSERT's `write()` populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the position-delete path instead of emitting the equality delete needed to remove the prior batch's row. The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for INSERT when upsert is enabled - matching Flink's behavior. This produces an equality delete for every INSERT in upsert mode, even for genuinely new keys (no-op at read time, resolved by compaction). This is the same trade-off Flink makes. Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`) that commit to real Iceberg tables and read back row data, following the Flink `TestDeltaTaskWriter` pattern. These tests verify that equality deletes actually mask prior-batch rows - something the existing mock-based unit tests could not validate. Also updates documentation with a delete behavior matrix, exactly-once semantics explanation, and operational guidance for CDC mode.
cross-batch duplicates In upsert mode, INSERT did not call `deleteKey()` before `write()`, unlike Flink's `BaseDeltaTaskWriter` (established in PR apache#2863, refined in PR apache#4364). This caused a "shielding" bug: when INSERT and UPDATE for the same key arrived in the same batch, INSERT's `write()` populated `insertedRowMap`, causing UPDATE's `deleteKey()` to take the position-delete path instead of emitting the equality delete needed to remove the prior batch's row. The fix adds `deleteKey(keyProjection.wrap(row))` before `write()` for INSERT when upsert is enabled - matching Flink's behavior. This produces an equality delete for every INSERT in upsert mode, even for genuinely new keys (no-op at read time, resolved by compaction). This is the same trade-off Flink makes. Adds table-level integration tests (`TestCDCDeltaWriterTableLevel`) that commit to real Iceberg tables and read back row data, following the Flink `TestDeltaTaskWriter` pattern. These tests verify that equality deletes actually mask prior-batch rows - something the existing mock-based unit tests could not validate. Also updates documentation with a delete behavior matrix, exactly-once semantics explanation, and operational guidance for CDC mode.
fork from #1996
author: @openinx