[Improve][Transform-V2] Improve embedding model invocation reliability#10863
[Improve][Transform-V2] Improve embedding model invocation reliability#10863yzeng1618 wants to merge 4 commits into
Conversation
Add a shared nlpmodel invocation runtime with retry, timeout, error classification, response count validation, safe logging, metrics hooks, and cache boundary. Route OpenAI, Doubao, Qianfan, Custom, and Zhipu embedding calls through ProviderAdapter and ModelInvocationRuntime while keeping default retry behavior compatible. Document request-level batching, reliability options, idempotency boundaries, and cache/logging behavior for English and Chinese Embedding docs.
…nd add comments for RAG metadata contract
…nd add comments for RAG metadata contract
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the contribution! I went through the full diff and the direction makes sense. The new common invocation runtime is wired into the normal embedding path correctly, the new retry/timeout options are actually exposed through the factory contract, and I did not find a blocking issue in the current revision.
What problem this PR solves
- User pain point
Embedding providers currently handle failures in a fragmented way. Timeout behavior, retry behavior, response-count validation, and safe diagnostics are not centralized, so users get inconsistent behavior across providers when they hit rate limits, transient 5xx responses, or mismatched output sizes. - Fix approach
This PR introduces a sharedModelInvocationRuntime,ModelInvocationOptions, andProviderAdapterboundary. Providers still own provider-specific request/response logic, but retry, timeout propagation, output-count validation, and safe logging move into one common runtime path. - One-line summary
This turns embedding invocation from “each provider handles reliability separately” into “shared runtime reliability with provider-specific protocol adapters”.
1. Code review
1.1 Core logic analysis
Precise change scope
The main pieces are:
-
common runtime layer
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntime.java:28-196
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ProviderAdapter.java:22-38 -
config exposure
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelTransformConfig.java:81-113
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransformFactory.java:42-84 -
runtime entry points
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransform.java:89-203
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/remote/AbstractModel.java:43-77 -
provider integration and tests
.../remote/openai/OpenAIModel.java
.../remote/doubao/DoubaoModel.java
.../remote/qianfan/QianfanModel.java
.../remote/custom/CustomModel.java
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntimeTest.java:31-199
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/embedding/EmbeddingModelDimensionTest.java:51-214
Before / after
The new common entry point is:
for (int attempt = 1; attempt <= options.getRetryMaxAttempts(); attempt++) {
ModelInvocationContext context = new ModelInvocationContext(...);
try {
T output = adapter.invoke(inputs, context);
validateOutputCount(...);
return output;
} catch (IOException e) {
handleInvocationException(normalize(e, context), context, attempt, start);
} catch (RuntimeException e) {
handleInvocationException(normalize(e, context), context, attempt, start);
}
}And the embedding factory now really exposes the new options:
.optional(
EmbeddingTransformConfig.API_PATH,
EmbeddingTransformConfig.SINGLE_VECTORIZED_INPUT_NUMBER,
EmbeddingTransformConfig.PROCESS_BATCH_SIZE,
ModelTransformConfig.MODEL_RETRY_MAX_ATTEMPTS,
ModelTransformConfig.MODEL_RETRY_BACKOFF_MS,
ModelTransformConfig.MODEL_RETRY_MAX_BACKOFF_MS,
ModelTransformConfig.MODEL_REQUEST_TIMEOUT_MS)Key findings
- The normal path definitely hits the new runtime, because
EmbeddingTransform.open()buildsModelInvocationOptionsandAbstractModelnow ownsModelInvocationRuntime. - The current version centralizes the three most important reliability behaviors: request timeout, retry for retryable failures, and response-count validation.
- The latest fix for dimension initialization is effective: dimension probing now bypasses the batch-size response-count mismatch trap where appropriate.
- The docs, option definitions, and factory contract are now aligned for the new retry/timeout options.
Full runtime chain
Transform initialization
-> EmbeddingTransform.open() [89-203]
-> read provider / model / api_path / retry / timeout from config [91-124]
-> build the concrete provider model [97-187]
-> probe model.dimension() for output dimension [198]
Shared invocation layer
-> AbstractModel.vectorization(...) [49-77]
-> batchProcess(fields, singleVectorizedInputNumber) [62-76]
-> vector(batch)
-> provider enters invocationRuntime.invoke(...)
Common runtime
-> ModelInvocationRuntime.invoke(...) [47-81]
-> create ModelInvocationContext [49-57]
-> adapter.invoke(inputs, context) [60]
-> validateOutputCount(...) [61, 87-103]
-> on failure normalize(...) + handleInvocationException(...) [69-72, 105-156]
-> timeout / rate limit / temporary server failures can retry
-> parse failures / auth failures / count mismatches do not retry
Contract exposure
-> ModelTransformConfig defines retry / timeout options [81-113]
-> EmbeddingTransformFactory.optionRule() exposes them [42-84]
-> docs/en/transforms/embedding.md:31-89
-> docs/zh/transforms/embedding.md
1.2 Compatibility impact
Conclusion: fully compatible.
- API: no breaking API change
- Configs: only optional new configs added
- Defaults:
model_retry_max_attempts = 1, so the old no-retry behavior stays intact by default - Protocol: providers still own their provider-specific request/response protocol
- Serialization: unchanged
- Historical behavior: existing jobs keep the same behavior unless they explicitly opt into retries/backoff
1.3 Performance / side effects
- CPU / memory / GC: small added runtime overhead only at the remote invocation boundary
- Network: retries are bounded and disabled by default
- Concurrency: no new global mutable state introduced
- Idempotency: the docs are explicit that repeated attempts may still be billed or have provider-side effects
- Resource lifecycle: providers still close their own
CloseableHttpClient
1.4 Error handling and logging
I did not find a blocking code issue in the current revision.
Good points here:
ModelInvocationRuntime.logInvocationFailure(...)records safe diagnostic context only.- The tests explicitly verify that provider response bodies with secrets/raw text are not surfaced directly in exception messages.
- Response-count mismatch fails fast instead of emitting misaligned vectors.
CI note:
- GitHub
Buildis green on the latest head.
2. Code quality
2.1 Code style
The new runtime classes are reasonably clear, and the core responsibility split is better than before.
2.2 Test coverage
The added tests cover the right regression surface:
- default retry policy attempts once,
- 429 / 5xx retry paths,
- non-retryable auth / parse / response-count failures,
- and the dimension-probing path that should not be rejected by batch response-count validation.
2.3 Documentation
This is a user-visible behavior improvement, so docs are required, and they are present in both English and Chinese:
docs/en/transforms/embedding.md:31-89docs/zh/transforms/embedding.md
The default behavior is also documented honestly: no retries unless the user opts in.
3. Architecture assessment
3.1 Solution quality
This is a strong long-term direction. The PR moves shared reliability concerns into one runtime instead of continuing to duplicate them inside each provider.
3.2 Maintainability
Maintainability is clearly better now. Adding a new provider no longer requires re-implementing the same retry / timeout / validation skeleton.
3.3 Extensibility
The new runtime/context/error-type split gives you a clean place for future provider growth and for later reliability improvements.
3.4 Historical compatibility
No migration burden introduced by this change.
4. Issue summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| - | No formal issue found in the current revision | - | - |
5. Merge conclusion
Conclusion: can merge
-
Blocking items
- No code blocker from my side in the current revision.
-
Non-blocking suggestions
- If you want to harden this further later, I would consider adding one more regression test around transient plain network
IOExceptioncases. That is not a blocker for this PR, though.
- If you want to harden this further later, I would consider adding one more regression test around transient plain network
Overall, this is more than just “adding a few config options”. It gives embedding invocation a real shared reliability boundary, keeps the old default behavior compatible, and backs the new behavior with tests and docs. From the current code path, I’m good with merge.
Purpose of this pull request
This PR improves the Embedding transform remote model invocation reliability and fixes embedding vector dimension initialization.
Does this PR introduce any user-facing change?
Yes.
This PR adds new optional Embedding transform configuration options for remote model invocation retry and timeout control. Existing jobs keep the previous behavior by default because
model_retry_max_attemptsdefaults to1.It also fixes an issue where the output vector dimension could be initialized incorrectly when batch embedding responses contain multiple items. After this change, the output vector dimension is derived from the actual embedding vector length.
How was this patch tested?
Added unit tests for the new and fixed behavior
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.