A Nim implementation of PocketFlow, a minimalist flow-based agent programming framework with LLM integration, RAG capabilities, and orchestration features for building LLM-powered workflows.
- 🔄 Node-based workflows - Node, BatchNode, ParallelBatchNode
- 🌊 Flow orchestration - Flow, BatchFlow, ParallelBatchFlow
- ⚡ Async/await everywhere - Built on Nim's efficient asyncdispatch
- 🔁 Retry & fallback - Robust error handling with exponential backoff
- 🔗 Pythonic chaining - Clean
>>and-operators - 📦 Shared context - Type-safe JSON data sharing
- 🤖 Multiple providers - OpenAI, Anthropic Claude, Google Gemini, Ollama
- 💰 Cost tracking - Automatic token counting and cost estimation
- 💾 Smart caching - LRU cache for responses and embeddings
- 📊 Streaming support - Real-time response streaming
- 🔐 Error handling - Custom exception types with provider-specific errors
- 📄 Document chunking - Fixed-size, sentence, paragraph, semantic strategies
- 🧮 Embeddings - Generate and cache vector embeddings
- 🔍 Semantic search - Cosine similarity and top-k retrieval
- 🎯 Reranking - Query-aware result reordering
- ⏰ TimeoutNode - Execution timeout protection
- 🔀 ConditionalNode - Dynamic branching logic
- 🔁 LoopNode - Iteration with result aggregation
- 🗺️ MapNode - Concurrent mapping operations
- 📈 Observability - Structured logging, metrics, and tracing
- 💾 State persistence - Save and restore flow state
nimble install pocketflowOr add to your .nimble file:
requires "pocketflow >= 0.2.0"import asyncdispatch, json
import pocketflow
let step1 = newNode(
exec = proc(ctx: PfContext, params: JsonNode, prepRes: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
ctx["message"] = %"Hello from PocketFlow!"
return %"done"
)
let step2 = newNode(
exec = proc(ctx: PfContext, params: JsonNode, prepRes: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
echo ctx["message"].getStr()
return %"complete"
)
# Chain nodes with >> operator
discard step1 >> step2
let flow = newFlow(step1)
let ctx = newPfContext()
waitFor flow.internalRun(ctx)import asyncdispatch, json, os
import pocketflow
let llm = newLlmClient(
provider = OpenAI,
apiKey = getEnv("OPENAI_API_KEY"),
model = "gpt-4"
)
let chatNode = newNode(
exec = proc(ctx: PfContext, params: JsonNode, prepRes: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
let response = await llm.chat(ctx["prompt"].getStr())
ctx["response"] = %response
return %"done"
)
let flow = newFlow(chatNode)
let ctx = newPfContext()
ctx["prompt"] = %"Tell me a fun fact!"
waitFor flow.internalRun(ctx)
echo ctx["response"].getStr()import pocketflow
# Chunk document
let document = readFile("document.txt")
let opts = newChunkingOptions(strategy = Sentences, chunkSize = 500)
var chunks = chunkDocument(document, opts)
# Search with embeddings
let queryEmbedding = @[0.1, 0.5, 0.3]
let topChunks = findTopK(queryEmbedding, chunks, k = 3)
# Rerank results
let reranked = rerankChunks(chunks, "search query")
echo "Top result: ", topChunks[0].chunk.textNodes are the building blocks of workflows. Each node has three phases:
- Prep: Prepare data (optional)
- Exec: Execute main logic (with retry support)
- Post: Process results and determine next action
let node = newNode(
prep = proc(ctx: PfContext, params: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
return %42
,
exec = proc(ctx: PfContext, params: JsonNode, prepRes: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
let value = prepRes.getInt()
return %(value * 2)
,
post = proc(ctx: PfContext, params: JsonNode, prepRes: JsonNode, execRes: JsonNode): Future[string] {.async, closure, gcsafe.} =
return "next_step"
,
maxRetries = 3,
waitMs = 1000
)Note: Always use
{.async, closure, gcsafe.}pragmas for callback procs.
Flows orchestrate node execution:
# Linear flow with >> operator
discard node1 >> node2 >> node3
let flow = newFlow(node1)
# Branching flow with - operator
discard router - "success" >> successNode
discard router - "error" >> errorNode
let branchFlow = newFlow(router)let batchNode = newBatchNode(
prep = proc(ctx: PfContext, params: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
return %[%"item1", %"item2", %"item3"]
,
execItem = proc(ctx: PfContext, params: JsonNode, item: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
return %(item.getStr().toUpperAscii())
,
post = proc(ctx: PfContext, params: JsonNode, prepRes: JsonNode, execRes: JsonNode): Future[string] {.async, closure, gcsafe.} =
ctx["results"] = execRes
return "done"
)
# Parallel processing with concurrency limit
let parallelNode = newParallelBatchNode(
prep = ...,
execItem = ...,
maxConcurrency = 5
)let llm = newLlmClient(
provider = OpenAI,
apiKey = "sk-...",
model = "gpt-4o-mini"
)let llm = newLlmClient(
provider = Anthropic,
apiKey = "sk-ant-...",
model = "claude-3-5-sonnet-20241022"
)let llm = newLlmClient(
provider = Google,
apiKey = "...",
model = "gemini-1.5-flash"
)let llm = newLlmClient(
provider = Ollama,
model = "llama3"
)let tracker = newCostTracker()
let llm = newLlmClient(
provider = OpenAI,
apiKey = apiKey,
costTracker = tracker
)
# Use the client...
let response = await llm.chat("Hello")
# Get summary
let summary = tracker.getSummary()
echo "Total cost: $", summary["total_cost_usd"].getFloat()let cache = newCache()
let llm = newLlmClient(
provider = OpenAI,
apiKey = apiKey,
cache = cache
)
# First call hits API
let response1 = await llm.chat("Same prompt")
# Second call returns cached result
let response2 = await llm.chat("Same prompt")
# Clear cache when needed
cache.clear()let conditional = newConditionalNode(
condition = proc(ctx: PfContext, params: JsonNode): Future[bool] {.async, closure, gcsafe.} =
return ctx["score"].getInt() > 80
,
trueNode = highScoreNode,
falseNode = lowScoreNode
)let loop = newLoopNode(
items = proc(ctx: PfContext, params: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
return %[%1, %2, %3, %4, %5]
,
body = processItemNode,
maxIterations = 100,
aggregateResults = true
)
# Access current item via ctx["__loop_item__"]
# Access current index via ctx["__loop_index__"]let timeout = newTimeoutNode(
innerNode = slowOperationNode,
timeoutMs = 5000 # 5 seconds
)let mapNode = newMapNode(
mapFunc = proc(ctx: PfContext, item: JsonNode): Future[JsonNode] {.async, closure, gcsafe.} =
return %(item.getInt() * 2)
,
maxConcurrency = 3
)
# Set items via ctx["__map_items__"]
# Results stored in ctx["__map_results__"]let store = newStateStore(".pocketflow_state")
# Capture and save state
let state = captureState(ctx, "my_flow", %*{"version": "1.0"})
saveState(store, state)
# Load and restore state
let loadedState = loadState(store, "my_flow")
let restoredCtx = newPfContext()
restoreContext(restoredCtx, loadedState)# Structured logging
logStructured(Info, "Processing started", [("user_id", "123")])
# Metrics
recordMetric("requests_processed", 1.0, [("status", "success")])
# Tracing with spans
let span = newSpan("expensive_operation")
# ... do work ...
finish(span)
echo "Duration: ", getDurationMs(span), "ms"
# Get metrics summary
echo getMetricsSummary().pretty()See the examples/ directory for complete examples:
simple_chat.nim- Basic LLM integrationrag_example.nim- Full RAG pipelineadvanced_flow.nim- Advanced features demomulti_provider.nim- Multiple LLM providersbenchmarks.nim- Performance benchmarks
# Run all unit tests
nimble test
# Run live LLM tests (requires API keys)
nimble testlive
# Build examples
nimble examplesTest Coverage: 126 tests across 14 test files covering all modules.
PocketFlow-Nim/
├── src/pocketflow/
│ ├── context.nim # Shared context
│ ├── node.nim # Core node types
│ ├── flow.nim # Flow orchestration
│ ├── llm.nim # LLM client
│ ├── errors.nim # Exception types
│ ├── cache.nim # Caching layer
│ ├── tokens.nim # Cost tracking
│ ├── observability.nim # Logging & metrics
│ ├── rag.nim # RAG utilities
│ ├── advanced_nodes.nim # Advanced node types
│ ├── persistence.nim # State persistence
│ └── benchmark.nim # Benchmarking utilities
├── tests/ # 126 unit tests
├── examples/ # Example applications
└── docs/ # Documentation
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new features
- Ensure CI passes
- Submit a pull request
MIT License - see LICENSE file for details.
- Inspired by PocketFlow (Python)
- Built with ❤️ using Nim
- Core framework (Node, BatchNode, ParallelBatchNode)
- Flow orchestration (Flow, BatchFlow, ParallelBatchFlow)
- Multiple LLM providers (OpenAI, Anthropic, Google, Ollama)
- RAG capabilities (chunking, similarity, retrieval, reranking)
- Advanced nodes (Conditional, Loop, Timeout, Map)
- Observability (logging, metrics, tracing)
- State persistence (save, load, restore)
- Comprehensive test suite (126 tests)
- WebSocket streaming
- GraphQL API
- Vector database integrations
- Multi-agent orchestration
Star ⭐ this repo if you find it useful!