Skip to content

[Improve][Core] Support close-table events for bounded multi-table jobs#10874

Draft
davidzollo wants to merge 1 commit into
apache:devfrom
davidzollo:david_st2519-close-table-event
Draft

[Improve][Core] Support close-table events for bounded multi-table jobs#10874
davidzollo wants to merge 1 commit into
apache:devfrom
davidzollo:david_st2519-close-table-event

Conversation

@davidzollo
Copy link
Copy Markdown
Contributor

What changed

  • added a generic CloseTableEvent and SupportCloseTableSinkWriter hook in seatunnel-api
  • added engine-side source-reader reverse event delivery so enumerators can notify readers about table-level completion
  • propagated close-table events through source -> transform -> sink lifecycles
  • taught MultiTableSinkWriter to close a table only after all expected upstream close-table signals arrive
  • added a bounded JDBC source implementation that reports split completion, aggregates per-table completion in the enumerator, and emits close-table progress safely to downstream
  • preserved close-table event metadata in TableRenameTransform

Why

Bounded multi-table jobs currently keep per-table sink resources alive until the whole task closes because there is no table-complete control event in the open-source pipeline.

The tricky part is parallelism: a single "table finished" event is not safe when multiple upstream readers can still have in-flight rows for the same table. This PR fixes that by carrying upstream-reader progress in the close-table event and only closing a table after all participating readers have reported completion.

Closes #10873

Validation

  • ./mvnw spotless:apply
  • ./mvnw -pl seatunnel-api -Dtest=MultiTableSinkWriterTest test
  • ./mvnw -pl seatunnel-connectors-v2/connector-jdbc -Dtest=JdbcSourceReaderTest,JdbcSourceSplitEnumeratorTest test
  • ./mvnw -pl seatunnel-transforms-v2 -Dtest=TableRenameTransformTest test
  • ./mvnw -pl seatunnel-engine/seatunnel-engine-server -DskipTests compile

Notes

  • ./mvnw -pl seatunnel-engine/seatunnel-engine-server -am -DskipTests compile still fails in the pre-existing seatunnel-config-shade module because it cannot resolve shaded typesafe-config classes. That failure is outside this PR's change set.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I pulled the latest head locally as seatunnel-review-10874 against upstream/dev and traced the full bounded JDBC multi-table path. I did not run the PR locally in this batch; this is a source-level review plus the current GitHub check metadata.

What This PR Fixes

  • User pain: in bounded multi-table JDBC jobs, the runtime can only clean up when the whole source finishes, so a table that is already fully consumed cannot release its sink-side writer/resources early.
  • Fix approach: add a per-table completion protocol: the JDBC reader reports split completion to the enumerator, the enumerator decides when a table is globally finished, the reader emits a CloseTableEvent, and the multi-table sink closes the matching per-table writer.
  • One-line summary: this PR introduces a real close-table control path so finished tables can shut down before the whole job ends.

Full Runtime Chain I Rechecked

bounded multi-table JDBC source
  -> JdbcSourceSplitEnumerator.run()
      -> splitter.generateSplits(table)
      -> addPendingSplit(...)
      -> assignSplit(...)

reader consumes split
  -> JdbcSourceReader.pollNext()
      -> inputFormat.open(split)
      -> output.collect(row)
      -> context.sendSourceEventToEnumerator(new JdbcSplitFinishedEvent(tablePath))

enumerator tracks per-table progress
  -> JdbcSourceSplitEnumerator.handleSourceEvent(...)
      -> unfinishedSplitsPerTable[table]--
      -> when remain reaches 0
           -> send JdbcTableFinishedEvent(tablePath, expectedCloseEventCount)
              to every participating reader

reader converts source event to downstream control event
  -> JdbcSourceReader.handleSourceEvent(...)
      -> pendingGlobalCloseTables.remove(table)
      -> pendingCloseTableEvents.add(new CloseTableEvent(...))

source -> transform -> sink propagation
  -> SeaTunnelSourceCollector.collect(CloseTableEvent)
      -> sendRecordToNext(new Record<>(event))
  -> TransformFlowLifeCycle.received(...)
      -> transform.mapCloseTableEvent(...)
  -> SinkFlowLifeCycle.received(...)
      -> writer.handleCloseTableEvent(event)

multi-table sink close
  -> MultiTableSinkWriter.handleCloseTableEvent(...)
      -> aggregate sourceSubtaskId acknowledgements
      -> once currentCount == requiredCount
           -> closeTable(tableId)
              -> waitUntilTableQueueDrained(tableId)
              -> remove per-table writer(s)
              -> sinkWriter.close()

Core Logic Review

Exact change

  • Source-side protocol bookkeeping:
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:51-76,151-187,215-243
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceReader.java:47-50,76-109,124-135
  • Engine event forwarding:
    • seatunnel-engine/.../SeaTunnelSourceCollector.java:139-145
    • seatunnel-engine/.../TransformFlowLifeCycle.java:118-141
    • seatunnel-engine/.../SinkFlowLifeCycle.java:260-272
  • Sink-side close handling:
    • seatunnel-api/.../SupportCloseTableSinkWriter.java:24-32
    • seatunnel-api/.../MultiTableSinkWriter.java:79-85,239-268,350-425

Before / after summary

Before this PR, the JDBC reader just finished the split and closed the input format; there was no downstream “this table is now globally finished” signal.

After this PR:

  • the reader reports JdbcSplitFinishedEvent,
  • the enumerator counts unfinished splits per table,
  • the reader turns JdbcTableFinishedEvent into CloseTableEvent,
  • and the multi-table sink closes the matched table writer once all expected source subtasks have acknowledged that table.

Key findings

  • The normal no-failure bounded JDBC multi-table path does hit the new logic.
  • This is not a local helper change; it is a new cross-layer runtime protocol, so returned splits and checkpoint/restore are part of the main correctness surface.
  • The happy path is directionally right, but the returned-split accounting is wrong today.
  • The checkpoint/restore boundary is also incomplete, so the protocol can break after recovery even if it works on a fresh run.

Findings

Issue 1: returned splits are counted twice, so the final close-table event can be delayed forever

  • Location:
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:115-127
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:157-179
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:215-243
  • Severity: High
  • Why this is a blocker:
    unfinishedSplitsPerTable is supposed to represent “how many splits of this table are still unfinished”. But addSplitsBack(...) calls recordPendingSplit(...) again for a returned split, which increments the unfinished counter one more time even though the original in-flight split was never decremented. That means a returned split can permanently inflate the remaining count.
  • Risk:
    • the table never reaches zero,
    • JdbcTableFinishedEvent is never emitted,
    • the downstream CloseTableEvent never arrives,
    • and the per-table sink writer cannot be closed on failover / split-return paths.
  • Better fix:
    • Preferred: model split ownership separately from the unfinished total, and move returned splits without re-incrementing the table-level counter.
    • Smaller patch: at minimum, distinguish “brand-new split” from “returned split” so the same split does not call merge(..., 1, Integer::sum) twice.

Issue 2: the close-table protocol state is not checkpointed/restored end to end

  • Location:

    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumerator.java:183-187,226-243
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceReader.java:47-50,106-108,124-135
    • seatunnel-engine/.../SourceFlowLifeCycle.java:395-399
  • Severity: High

  • Why this is a blocker:
    The new protocol needs more than just pending splits. Today:

    • the enumerator snapshot still only stores pendingTables + pendingSplits,
    • the reader snapshot still only stores splits,
    • unfinishedSplitsPerTable, readersPerTable, pendingCloseTableEvents, and pendingGlobalCloseTables are all in-memory only.

    That breaks recovery in two different ways:

    1. splits already assigned to readers at checkpoint time are no longer in pendingSplits, so after restore the enumerator cannot rebuild the unfinished-per-table accounting for those in-flight splits. When the restored reader later reports JdbcSplitFinishedEvent, remain == null and the event is ignored.
    2. if checkpoint happens after the reader has received JdbcTableFinishedEvent but before pollNext() emits the downstream CloseTableEvent, that pending close signal is lost too.
  • Risk:

    • a restored bounded multi-table job can behave differently from a fresh run,
    • some tables may never emit the final close-table signal after recovery,
    • sink writers stay open longer than intended or until task shutdown,
    • and the protocol is not actually checkpoint-safe in its current form.
  • Better fix:

    • Preferred: explicitly checkpoint the minimal protocol state on both sides: per-table unfinished counts / participant readers in the enumerator, plus pending close signals in the reader.
    • Alternative: redesign the recovery boundary so the enumerator can deterministically rebuild per-table unfinished state from restored reader splits instead of depending only on pendingSplits.

Issue 3: the new tests only cover the happy path, not split-return or recovery

  • Location:
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceSplitEnumeratorTest.java:209-363
    • seatunnel-connectors-v2/connector-jdbc/.../JdbcSourceReaderTest.java:39-76
    • seatunnel-api/.../MultiTableSinkWriterTest.java:69-107
  • Severity: Medium
  • Why this matters:
    The new tests prove the straight-line path, but they do not cover the two most important regression surfaces for this protocol:
    • addSplitsBack(...)
    • checkpoint/restore around pending table-close signals
  • Better fix:
    • add one regression test for split return / reassignment without double-counting unfinished splits;
    • add one recovery test where readers restore in-flight splits and still eventually produce the correct CloseTableEvent.

Compatibility / Side Effects

  • API / config / defaults: no direct public user-facing break.
  • Compatibility conclusion: partially compatible.
    The external config/API surface is mostly fine, but the new internal protocol is not checkpoint-safe yet, so restored behavior is not equivalent to fresh-run behavior.
  • CPU / memory / network overhead: modest. The main concern is correctness, not raw performance.
  • Retry / idempotency:
    • sink-side close deduplication is directionally reasonable,
    • source-side returned-split accounting is not retry-safe yet.

Tests / CI

  • The added tests cover the happy path, but not the real failover/recovery edges described above.
  • The current GitHub Build check is also red. The visible failure surface includes:
    • Run / jdbc-connectors-it-part-1, whose job page annotations repeatedly show Run SeaTunnel on spark failed
    • Run / jdbc-connectors-it-part-2/3/5/6/7
    • Run / all-connectors-it-3
    • Run / paimon-connector-it
    • Run / connector-file-sftp-it
    • Run / Dead links
    • Run / unit-test (11, windows-latest)

I cannot honestly attribute every failing job to this PR from the currently visible public annotations alone, but the PR definitely does not have a trustworthy green CI signal right now.

Merge Decision

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: fix the returned-split accounting so unfinished split counts are not incremented twice.
  • Issue 2: make the close-table protocol checkpoint/restore-safe across both enumerator and reader state.
  1. Suggested non-blocking follow-up
  • Issue 3: add targeted regression coverage for split-return and recovery paths so these bugs do not reopen.

Overall, I agree with the problem this PR is trying to solve, and the event-driven direction is the right one. But this is a cross-layer runtime protocol change touching seatunnel-api, seatunnel-engine, and the JDBC source path. For that kind of change, the failover and restore semantics are part of the main path, not an edge detail. Once the returned-split accounting and recovery-state boundary are fixed, I think this design becomes much easier to trust.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improve][Core] Support close-table events for bounded multi-table jobs

2 participants