feat: introduce scheduler-based engine with goroutine-per-node execution#803
Merged
Conversation
8f0e518 to
2dad321
Compare
a674860 to
4c2fba9
Compare
…ecution Squashes the workflow engine WIP work onto the latest wolo/workflows base. The branch had 8 incremental commits; this single commit captures the cumulative state on top of upstream PR #795 (EdgeBuilder), PR #796 (NodeConfig), and PR #797 (node name validation). Architecture: * Goroutine-per-node execution model: each scheduled node runs in its own goroutine, pushing events into a buffered channel. * Single consumer goroutine (runState.run) drains the channel, applies state-side effects, yields events to the caller, and schedules successors when nodes complete. * Replaces the legacy in-process BFS in Workflow.Run with the new scheduler, removing findNextNodes and the inline event loop. New types and constructors: * BaseNode (and NewBaseNode) for shared Name/Description/Config bookkeeping; FunctionNode and toolNode now embed it. * Graph helpers in graph.go: indexed adjacency for O(1) successor lookup. * RunState (persistable) and runState (in-process scheduler bag) in state.go; node lifecycle map (NodeStatus + per-node accumulators). * NodeContext wraps InvocationContext with a per-node TriggeredBy accessor; agent.InvocationContext gains TriggeredBy() returning "" for non-workflow contexts (mocks updated accordingly). * Scheduler (scheduler.go): runNode goroutine wrapper, eventQueue, cancelAll, and the run consumer loop with sibling cancellation and per-node timeout. Routing and validation: * findSuccessors honours unconditional edges, concrete Routes, and the Default fallback. Silent dead-ends remain intentional per adk-python parity. * Workflow.New now returns (*Workflow, error) — picks up the name validation introduced upstream by PR #797. NodeConfig timeout shape: * Timeout is time.Duration (not *time.Duration), with zero meaning "inherit parent context". Matches net.Dialer.Timeout and the http.Server.*Timeout convention; keeps call sites free of pointer boilerplate. * Adds RerunOnResumeOr / WaitForOutputOr accessor helpers for pointer-typed pointer-typed tri-state fields. * Adds TestDefaultRetryConfig from upstream PR #796. Examples and tests: * examples/workflow/basic uses NodeConfig{RetryConfig: DefaultRetryConfig()} to demo the helper. * 7 New(edges) callers updated for the new (*Workflow, error) signature. Tests verified: go build ./..., go vet ./..., go test -race ./workflow/... ./agent/workflowagent/... all pass.
4c2fba9 to
0481149
Compare
anFatum
approved these changes
May 13, 2026
hanorik
approved these changes
May 13, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
The legacy in-process BFS dispatcher in
Workflow.Runcannot run sibling nodesconcurrently, lacks per-node timeouts and cancellation, and provides no place
to hang HITL pauses, retries, or dynamic children — all of which are upcoming
workflow milestones.
Solution:
Replace the BFS with a goroutine-per-node scheduler. Each scheduled node runs
in its own goroutine, pushes events into a buffered channel, and a single
consumer goroutine drains the channel, applies state-side effects, yields
events to the caller, and schedules successors when nodes complete.
Sibling cancellation on first error, per-node
Config().Timeout, andcaller-break draining (
breakoutside the range loop stops furtherscheduling, not just yielding) are included. Producer-consumer model means no
mutexes are needed for scheduler state; race detector is clean.
Testing Plan
Unit Tests:
workflow/scheduler_test.gocovers linear chains, fan-out concurrency(barrier-based, verifies parallelism), sibling cancellation on first error,
caller-break stops scheduling (regression), per-node timeout, and the
single-output / single-routing-event accumulator contract.
Manual End-to-End (E2E) Tests:
examples/workflow/basicruns end-to-end on the new engine.Checklist