Skip to content

Wolo/workflow fanin#819

Draft
wolo-lab wants to merge 1 commit into
wolo/workflow_enginefrom
wolo/workflow_fanin
Draft

Wolo/workflow fanin#819
wolo-lab wants to merge 1 commit into
wolo/workflow_enginefrom
wolo/workflow_fanin

Conversation

@wolo-lab
Copy link
Copy Markdown

Please ensure you have read the contribution guide before creating a pull request.

Link to Issue or Description of Change

1. Link to an existing issue (if applicable):

  • Closes: #issue_number
  • Related: #issue_number

2. Or, if no issue exists, describe the change:

If applicable, please follow the issue templates to provide as much detail as
possible.

Problem:
A clear and concise description of what the problem is.

Solution:
A clear and concise description of what you want to happen and why you choose
this solution.

Testing Plan

Please describe the tests that you ran to verify your changes. This is required
for all PRs that are not small documentation or typo fixes.

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.

Please include a summary of passed go test results.

Manual End-to-End (E2E) Tests:

Please provide instructions on how to manually test your changes, including any
necessary setup or configuration. Please provide logs or screenshots to help
reviewers better understand the fix.

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end.
  • Any dependent changes have been merged and published in downstream modules.

Additional context

Add any other context or screenshots about the feature request here.

…i-activation

Adds explicit fan-in support to the workflow engine while preserving
adk-python's semantics for ordinary nodes (multiple incoming edges
on a non-Join node still fire it once per upstream completion).

Engine changes:
* Scheduler keys per-task state by a monotonically increasing
  activationID instead of node name, so a node can have multiple
  concurrent activations without overwriting each other's bookkeeping.
* Per-name FIFO trigger buffer serialises activations of the same
  node, so JoinNode (and any custom WaitForOutput=true node) sees
  per-predecessor activations one at a time and can safely
  accumulate state.
* WaitForOutput honoured in handleCompletion: a completed activation
  that produced no "output" event lands in NodeWaiting and skips
  successor scheduling, deferring the join to the next predecessor.
* Per-(run,name) joinAccumulator created lazily on first activation
  and cleared once the fan-in node emits its terminal output.

API additions:
* workflow.JoinNode + workflow.NewJoinNode: aggregates per-predecessor
  inputs into a single map[string]any output once every predecessor
  declared in the graph has fired. Forces Config().WaitForOutput=true
  regardless of caller input.
* (*nodeContext).InNodes() exposes the static predecessor-name set
  derived from the graph at construction; mirrors adk-python's
  Context.in_nodes.
* graph.inNodeNamesOf for engine-internal predecessor lookup.

Validation:
* validateJoinNodesHaveIncoming + ErrJoinNodeNoIncoming reject
  graphs whose JoinNode has zero incoming edges. adk-python defers
  this to runtime; we surface it at workflow.New so misconstructed
  graphs fail to build.

Tests:
* TestScheduler_MultiActivation_NonJoinNode_RunsTwice locks in the
  Python-parity "no automatic merge" behaviour for ordinary nodes.
* TestScheduler_WaitForOutput_{StaysWaitingWithoutOutput,
  EmittingOutputSchedulesSuccessors} cover both branches of the new
  WaitForOutput path.
* TestScheduler_TriggerBuffer_SerialisesSameNameActivations asserts
  the per-name FIFO contract JoinNode relies on.
* TestJoinNode_E2E_FanIn{Two,Three}Branches exercise the canonical
  fan-in graph shapes from the API design doc.
* TestJoinNode_Run_DetachedYieldsDegenerateOutput keeps the node
  usable in unit tests that exercise Run() outside the engine.
* TestJoinNode_Run_RejectsUnknownPredecessor + the validation tests
  cover the error paths.

All existing tests still pass; the suite is race-free under
go test -race.
@wolo-lab wolo-lab changed the base branch from main to wolo/workflow_engine May 12, 2026 19:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant