[Improve][Core] Support close-table events for bounded multi-table jobs#10874
[Improve][Core] Support close-table events for bounded multi-table jobs#10874davidzollo wants to merge 1 commit into
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I pulled the latest head locally as seatunnel-review-10874 against upstream/dev and traced the full bounded JDBC multi-table path. I did not run the PR locally in this batch; this is a source-level review plus the current GitHub check metadata.
What This PR Fixes
- User pain: in bounded multi-table JDBC jobs, the runtime can only clean up when the whole source finishes, so a table that is already fully consumed cannot release its sink-side writer/resources early.
- Fix approach: add a per-table completion protocol: the JDBC reader reports split completion to the enumerator, the enumerator decides when a table is globally finished, the reader emits a
CloseTableEvent, and the multi-table sink closes the matching per-table writer. - One-line summary: this PR introduces a real close-table control path so finished tables can shut down before the whole job ends.
Full Runtime Chain I Rechecked
bounded multi-table JDBC source
-> JdbcSourceSplitEnumerator.run()
-> splitter.generateSplits(table)
-> addPendingSplit(...)
-> assignSplit(...)
reader consumes split
-> JdbcSourceReader.pollNext()
-> inputFormat.open(split)
-> output.collect(row)
-> context.sendSourceEventToEnumerator(new JdbcSplitFinishedEvent(tablePath))
enumerator tracks per-table progress
-> JdbcSourceSplitEnumerator.handleSourceEvent(...)
-> unfinishedSplitsPerTable[table]--
-> when remain reaches 0
-> send JdbcTableFinishedEvent(tablePath, expectedCloseEventCount)
to every participating reader
reader converts source event to downstream control event
-> JdbcSourceReader.handleSourceEvent(...)
-> pendingGlobalCloseTables.remove(table)
-> pendingCloseTableEvents.add(new CloseTableEvent(...))
source -> transform -> sink propagation
-> SeaTunnelSourceCollector.collect(CloseTableEvent)
-> sendRecordToNext(new Record<>(event))
-> TransformFlowLifeCycle.received(...)
-> transform.mapCloseTableEvent(...)
-> SinkFlowLifeCycle.received(...)
-> writer.handleCloseTableEvent(event)
multi-table sink close
-> MultiTableSinkWriter.handleCloseTableEvent(...)
-> aggregate sourceSubtaskId acknowledgements
-> once currentCount == requiredCount
-> closeTable(tableId)
-> waitUntilTableQueueDrained(tableId)
-> remove per-table writer(s)
-> sinkWriter.close()
Core Logic Review
Exact change
- Source-side protocol bookkeeping:
seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:51-76,151-187,215-243seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceReader.java:47-50,76-109,124-135
- Engine event forwarding:
seatunnel-engine/.../SeaTunnelSourceCollector.java:139-145seatunnel-engine/.../TransformFlowLifeCycle.java:118-141seatunnel-engine/.../SinkFlowLifeCycle.java:260-272
- Sink-side close handling:
seatunnel-api/.../SupportCloseTableSinkWriter.java:24-32seatunnel-api/.../MultiTableSinkWriter.java:79-85,239-268,350-425
Before / after summary
Before this PR, the JDBC reader just finished the split and closed the input format; there was no downstream “this table is now globally finished” signal.
After this PR:
- the reader reports
JdbcSplitFinishedEvent, - the enumerator counts unfinished splits per table,
- the reader turns
JdbcTableFinishedEventintoCloseTableEvent, - and the multi-table sink closes the matched table writer once all expected source subtasks have acknowledged that table.
Key findings
- The normal no-failure bounded JDBC multi-table path does hit the new logic.
- This is not a local helper change; it is a new cross-layer runtime protocol, so returned splits and checkpoint/restore are part of the main correctness surface.
- The happy path is directionally right, but the returned-split accounting is wrong today.
- The checkpoint/restore boundary is also incomplete, so the protocol can break after recovery even if it works on a fresh run.
Findings
Issue 1: returned splits are counted twice, so the final close-table event can be delayed forever
- Location:
seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:115-127seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:157-179seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:215-243
- Severity: High
- Why this is a blocker:
unfinishedSplitsPerTableis supposed to represent “how many splits of this table are still unfinished”. ButaddSplitsBack(...)callsrecordPendingSplit(...)again for a returned split, which increments the unfinished counter one more time even though the original in-flight split was never decremented. That means a returned split can permanently inflate the remaining count. - Risk:
- the table never reaches zero,
JdbcTableFinishedEventis never emitted,- the downstream
CloseTableEventnever arrives, - and the per-table sink writer cannot be closed on failover / split-return paths.
- Better fix:
- Preferred: model split ownership separately from the unfinished total, and move returned splits without re-incrementing the table-level counter.
- Smaller patch: at minimum, distinguish “brand-new split” from “returned split” so the same split does not call
merge(..., 1, Integer::sum)twice.
Issue 2: the close-table protocol state is not checkpointed/restored end to end
-
Location:
seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:183-187,226-243seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceReader.java:47-50,106-108,124-135seatunnel-engine/.../SourceFlowLifeCycle.java:395-399
-
Severity: High
-
Why this is a blocker:
The new protocol needs more than just pending splits. Today:- the enumerator snapshot still only stores
pendingTables + pendingSplits, - the reader snapshot still only stores
splits, unfinishedSplitsPerTable,readersPerTable,pendingCloseTableEvents, andpendingGlobalCloseTablesare all in-memory only.
That breaks recovery in two different ways:
- splits already assigned to readers at checkpoint time are no longer in
pendingSplits, so after restore the enumerator cannot rebuild the unfinished-per-table accounting for those in-flight splits. When the restored reader later reportsJdbcSplitFinishedEvent,remain == nulland the event is ignored. - if checkpoint happens after the reader has received
JdbcTableFinishedEventbut beforepollNext()emits the downstreamCloseTableEvent, that pending close signal is lost too.
- the enumerator snapshot still only stores
-
Risk:
- a restored bounded multi-table job can behave differently from a fresh run,
- some tables may never emit the final close-table signal after recovery,
- sink writers stay open longer than intended or until task shutdown,
- and the protocol is not actually checkpoint-safe in its current form.
-
Better fix:
- Preferred: explicitly checkpoint the minimal protocol state on both sides: per-table unfinished counts / participant readers in the enumerator, plus pending close signals in the reader.
- Alternative: redesign the recovery boundary so the enumerator can deterministically rebuild per-table unfinished state from restored reader splits instead of depending only on
pendingSplits.
Issue 3: the new tests only cover the happy path, not split-return or recovery
- Location:
seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumeratorTest.java:209-363seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceReaderTest.java:39-76seatunnel-api/.../MultiTableSinkWriterTest.java:69-107
- Severity: Medium
- Why this matters:
The new tests prove the straight-line path, but they do not cover the two most important regression surfaces for this protocol:addSplitsBack(...)- checkpoint/restore around pending table-close signals
- Better fix:
- add one regression test for split return / reassignment without double-counting unfinished splits;
- add one recovery test where readers restore in-flight splits and still eventually produce the correct
CloseTableEvent.
Compatibility / Side Effects
- API / config / defaults: no direct public user-facing break.
- Compatibility conclusion: partially compatible.
The external config/API surface is mostly fine, but the new internal protocol is not checkpoint-safe yet, so restored behavior is not equivalent to fresh-run behavior. - CPU / memory / network overhead: modest. The main concern is correctness, not raw performance.
- Retry / idempotency:
- sink-side close deduplication is directionally reasonable,
- source-side returned-split accounting is not retry-safe yet.
Tests / CI
- The added tests cover the happy path, but not the real failover/recovery edges described above.
- The current GitHub
Buildcheck is also red. The visible failure surface includes:Run / jdbc-connectors-it-part-1, whose job page annotations repeatedly showRun SeaTunnel on spark failedRun / jdbc-connectors-it-part-2/3/5/6/7Run / all-connectors-it-3Run / paimon-connector-itRun / connector-file-sftp-itRun / Dead linksRun / unit-test (11, windows-latest)
I cannot honestly attribute every failing job to this PR from the currently visible public annotations alone, but the PR definitely does not have a trustworthy green CI signal right now.
Merge Decision
Conclusion: can merge after fixes
- Blocking items
- Issue 1: fix the returned-split accounting so unfinished split counts are not incremented twice.
- Issue 2: make the close-table protocol checkpoint/restore-safe across both enumerator and reader state.
- Suggested non-blocking follow-up
- Issue 3: add targeted regression coverage for split-return and recovery paths so these bugs do not reopen.
Overall, I agree with the problem this PR is trying to solve, and the event-driven direction is the right one. But this is a cross-layer runtime protocol change touching seatunnel-api, seatunnel-engine, and the JDBC source path. For that kind of change, the failover and restore semantics are part of the main path, not an edge detail. Once the returned-split accounting and recovery-state boundary are fixed, I think this design becomes much easier to trust.
What changed
CloseTableEventandSupportCloseTableSinkWriterhook inseatunnel-apiMultiTableSinkWriterto close a table only after all expected upstream close-table signals arriveTableRenameTransformWhy
Bounded multi-table jobs currently keep per-table sink resources alive until the whole task closes because there is no table-complete control event in the open-source pipeline.
The tricky part is parallelism: a single "table finished" event is not safe when multiple upstream readers can still have in-flight rows for the same table. This PR fixes that by carrying upstream-reader progress in the close-table event and only closing a table after all participating readers have reported completion.
Closes #10873
Validation
./mvnw spotless:apply./mvnw -pl seatunnel-api -Dtest=MultiTableSinkWriterTest test./mvnw -pl seatunnel-connectors-v2/connector-jdbc -Dtest=JdbcSourceReaderTest,JdbcSourceSplitEnumeratorTest test./mvnw -pl seatunnel-transforms-v2 -Dtest=TableRenameTransformTest test./mvnw -pl seatunnel-engine/seatunnel-engine-server -DskipTests compileNotes
./mvnw -pl seatunnel-engine/seatunnel-engine-server -am -DskipTests compilestill fails in the pre-existingseatunnel-config-shademodule because it cannot resolve shadedtypesafe-configclasses. That failure is outside this PR's change set.