Skip to content

[Improve][Transform-V2] Improve embedding model invocation reliability#10863

Open
yzeng1618 wants to merge 4 commits into
apache:devfrom
yzeng1618:dev-embedding
Open

[Improve][Transform-V2] Improve embedding model invocation reliability#10863
yzeng1618 wants to merge 4 commits into
apache:devfrom
yzeng1618:dev-embedding

Conversation

@yzeng1618
Copy link
Copy Markdown
Collaborator

Purpose of this pull request

This PR improves the Embedding transform remote model invocation reliability and fixes embedding vector dimension initialization.

Does this PR introduce any user-facing change?

Yes.

This PR adds new optional Embedding transform configuration options for remote model invocation retry and timeout control. Existing jobs keep the previous behavior by default because model_retry_max_attempts defaults to 1.

It also fixes an issue where the output vector dimension could be initialized incorrectly when batch embedding responses contain multiple items. After this change, the output vector dimension is derived from the actual embedding vector length.

How was this patch tested?

Added unit tests for the new and fixed behavior

Check list

zengyi added 4 commits May 8, 2026 18:31
Add a shared nlpmodel invocation runtime with retry, timeout, error classification, response count validation, safe logging, metrics hooks, and cache boundary.

Route OpenAI, Doubao, Qianfan, Custom, and Zhipu embedding calls through ProviderAdapter and ModelInvocationRuntime while keeping default retry behavior compatible.

Document request-level batching, reliability options, idempotency boundaries, and cache/logging behavior for English and Chinese Embedding docs.
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! I went through the full diff and the direction makes sense. The new common invocation runtime is wired into the normal embedding path correctly, the new retry/timeout options are actually exposed through the factory contract, and I did not find a blocking issue in the current revision.

What problem this PR solves

  • User pain point
    Embedding providers currently handle failures in a fragmented way. Timeout behavior, retry behavior, response-count validation, and safe diagnostics are not centralized, so users get inconsistent behavior across providers when they hit rate limits, transient 5xx responses, or mismatched output sizes.
  • Fix approach
    This PR introduces a shared ModelInvocationRuntime, ModelInvocationOptions, and ProviderAdapter boundary. Providers still own provider-specific request/response logic, but retry, timeout propagation, output-count validation, and safe logging move into one common runtime path.
  • One-line summary
    This turns embedding invocation from “each provider handles reliability separately” into “shared runtime reliability with provider-specific protocol adapters”.

1. Code review

1.1 Core logic analysis

Precise change scope

The main pieces are:

  • common runtime layer
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntime.java:28-196
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ProviderAdapter.java:22-38

  • config exposure
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelTransformConfig.java:81-113
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransformFactory.java:42-84

  • runtime entry points
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransform.java:89-203
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/remote/AbstractModel.java:43-77

  • provider integration and tests
    .../remote/openai/OpenAIModel.java
    .../remote/doubao/DoubaoModel.java
    .../remote/qianfan/QianfanModel.java
    .../remote/custom/CustomModel.java
    seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntimeTest.java:31-199
    seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/embedding/EmbeddingModelDimensionTest.java:51-214

Before / after

The new common entry point is:

for (int attempt = 1; attempt <= options.getRetryMaxAttempts(); attempt++) {
    ModelInvocationContext context = new ModelInvocationContext(...);
    try {
        T output = adapter.invoke(inputs, context);
        validateOutputCount(...);
        return output;
    } catch (IOException e) {
        handleInvocationException(normalize(e, context), context, attempt, start);
    } catch (RuntimeException e) {
        handleInvocationException(normalize(e, context), context, attempt, start);
    }
}

And the embedding factory now really exposes the new options:

.optional(
    EmbeddingTransformConfig.API_PATH,
    EmbeddingTransformConfig.SINGLE_VECTORIZED_INPUT_NUMBER,
    EmbeddingTransformConfig.PROCESS_BATCH_SIZE,
    ModelTransformConfig.MODEL_RETRY_MAX_ATTEMPTS,
    ModelTransformConfig.MODEL_RETRY_BACKOFF_MS,
    ModelTransformConfig.MODEL_RETRY_MAX_BACKOFF_MS,
    ModelTransformConfig.MODEL_REQUEST_TIMEOUT_MS)

Key findings

  1. The normal path definitely hits the new runtime, because EmbeddingTransform.open() builds ModelInvocationOptions and AbstractModel now owns ModelInvocationRuntime.
  2. The current version centralizes the three most important reliability behaviors: request timeout, retry for retryable failures, and response-count validation.
  3. The latest fix for dimension initialization is effective: dimension probing now bypasses the batch-size response-count mismatch trap where appropriate.
  4. The docs, option definitions, and factory contract are now aligned for the new retry/timeout options.

Full runtime chain

Transform initialization
  -> EmbeddingTransform.open() [89-203]
      -> read provider / model / api_path / retry / timeout from config [91-124]
      -> build the concrete provider model [97-187]
      -> probe model.dimension() for output dimension [198]

Shared invocation layer
  -> AbstractModel.vectorization(...) [49-77]
      -> batchProcess(fields, singleVectorizedInputNumber) [62-76]
      -> vector(batch)
          -> provider enters invocationRuntime.invoke(...)

Common runtime
  -> ModelInvocationRuntime.invoke(...) [47-81]
      -> create ModelInvocationContext [49-57]
      -> adapter.invoke(inputs, context) [60]
      -> validateOutputCount(...) [61, 87-103]
      -> on failure normalize(...) + handleInvocationException(...) [69-72, 105-156]
          -> timeout / rate limit / temporary server failures can retry
          -> parse failures / auth failures / count mismatches do not retry

Contract exposure
  -> ModelTransformConfig defines retry / timeout options [81-113]
  -> EmbeddingTransformFactory.optionRule() exposes them [42-84]
  -> docs/en/transforms/embedding.md:31-89
  -> docs/zh/transforms/embedding.md

1.2 Compatibility impact

Conclusion: fully compatible.

  • API: no breaking API change
  • Configs: only optional new configs added
  • Defaults: model_retry_max_attempts = 1, so the old no-retry behavior stays intact by default
  • Protocol: providers still own their provider-specific request/response protocol
  • Serialization: unchanged
  • Historical behavior: existing jobs keep the same behavior unless they explicitly opt into retries/backoff

1.3 Performance / side effects

  • CPU / memory / GC: small added runtime overhead only at the remote invocation boundary
  • Network: retries are bounded and disabled by default
  • Concurrency: no new global mutable state introduced
  • Idempotency: the docs are explicit that repeated attempts may still be billed or have provider-side effects
  • Resource lifecycle: providers still close their own CloseableHttpClient

1.4 Error handling and logging

I did not find a blocking code issue in the current revision.

Good points here:

  1. ModelInvocationRuntime.logInvocationFailure(...) records safe diagnostic context only.
  2. The tests explicitly verify that provider response bodies with secrets/raw text are not surfaced directly in exception messages.
  3. Response-count mismatch fails fast instead of emitting misaligned vectors.

CI note:

  • GitHub Build is green on the latest head.

2. Code quality

2.1 Code style

The new runtime classes are reasonably clear, and the core responsibility split is better than before.

2.2 Test coverage

The added tests cover the right regression surface:

  • default retry policy attempts once,
  • 429 / 5xx retry paths,
  • non-retryable auth / parse / response-count failures,
  • and the dimension-probing path that should not be rejected by batch response-count validation.

2.3 Documentation

This is a user-visible behavior improvement, so docs are required, and they are present in both English and Chinese:

  • docs/en/transforms/embedding.md:31-89
  • docs/zh/transforms/embedding.md

The default behavior is also documented honestly: no retries unless the user opts in.

3. Architecture assessment

3.1 Solution quality

This is a strong long-term direction. The PR moves shared reliability concerns into one runtime instead of continuing to duplicate them inside each provider.

3.2 Maintainability

Maintainability is clearly better now. Adding a new provider no longer requires re-implementing the same retry / timeout / validation skeleton.

3.3 Extensibility

The new runtime/context/error-type split gives you a clean place for future provider growth and for later reliability improvements.

3.4 Historical compatibility

No migration burden introduced by this change.

4. Issue summary

No. Issue Location Severity
- No formal issue found in the current revision - -

5. Merge conclusion

Conclusion: can merge

  1. Blocking items

    • No code blocker from my side in the current revision.
  2. Non-blocking suggestions

    • If you want to harden this further later, I would consider adding one more regression test around transient plain network IOException cases. That is not a blocker for this PR, though.

Overall, this is more than just “adding a few config options”. It gives embedding invocation a real shared reliability boundary, keeps the old default behavior compatible, and backs the new behavior with tests and docs. From the current code path, I’m good with merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants