A Go framework for building stateful, graph-based workflows with support for human-in-the-loop interrupts and LLM integration.
flodk is a workflow orchestration framework that enables you to build complex, multi-step processes as directed graphs. It provides built-in support for:
- Graph-based workflows: Define nodes and edges to create complex execution flows
- State management: Thread-safe state persistence across workflow steps
- Human-in-the-loop interrupts: Pause execution and request user input with validation
- LLM integration: Built-in nodes for data extraction and processing using LLM providers
- Checkpoint/resume: Resume workflow execution from interruption points
- Conditional routing: Dynamic edge resolution based on state
- Node: The basic unit of work in a workflow. Executes logic and transforms state.
- Graph: A directed graph of nodes connected by edges that define execution flow.
- Edge/EdgeResolver: Determines the next node to execute based on current state.
- Flow: Manages the execution of a graph with callbacks at various stages.
- Pipe: Supervises graph execution, manages state persistence, and handles interrupts.
- Store: Persists execution state for resumption after interrupts.
The framework uses a checkpoint-based approach to maintain execution state:
- CheckpointState: Stores the current node, visited nodes, and interrupt history
- ExecutionState: Combines checkpoint state with application-specific state
- ExecutionID: Uniquely identifies an execution (ID + flow name)
- Callbacks run synchronously inside
Flow.Execute. Any error returned by a callback is returned byExecute. - When a node returns a
HITLInterrupt,OnInterruptis invoked beforeExecutereturns. UseOnInterruptto persist interrupt state. - When a node succeeds, the edge resolver computes the next node,
CheckpointState.CheckpointIDis updated, and thenOnNodeExecis invoked. UseOnNodeExecto persist the checkpoint for the next step. OnNodeExecis not invoked on the interrupt path. Persist bothOnInterruptandOnNodeExecto ensure safe resumption.OnGraphEndruns once after the graph finishes. Its error is propagated.
import (
"context"
"github.com/aki-kong/flodk"
)
type MyState struct {
Value string
}
func main() {
ctx := context.Background()
// Build the graph
gb := flodk.NewGraphBuilder[MyState]()
graph, _ := gb.
AddNode("start", flodk.Noop[MyState]()).
AddNode("end", flodk.Noop[MyState]()).
AddEdge("start", "end").
SetStartNode("start").
Build()
// Create a pipe with an in-memory store
store := flodk.NewInMemoryStore[MyState]()
pipe := flodk.NewPipe("my_workflow", graph, store)
// Execute the workflow
state := MyState{Value: "initial"}
result, err := pipe.Invoke(ctx, "thread-123", state)
}Implement the Node interface to create custom processing steps:
type MyNode struct{}
func (n MyNode) Execute(ctx context.Context, state MyState) (MyState, error) {
state.Value = "processed"
return state, nil
}Route execution based on state values:
graph, _ := gb.
AddConditionalEdge(
"decision",
flodk.ConditionalFunction[MyState](func(ctx context.Context, state MyState) string {
if state.Value == "proceed" {
return "next_step"
}
return "alternate_step"
}),
map[string]string{
"next_step": "next_step",
"alternate_step": "alternate_step",
},
).
Build()Request user input during workflow execution:
values, err := flodk.InterruptWithValidation(
ctx,
"Please provide your name",
"name_required",
flodk.Requirements{
"name": {Type: flodk.Custom},
},
flodk.Requirements.Validate,
)
if err != nil {
// Handle interrupt error
// User will need to call pipe.Continue() with values
return state, err
}
state.Name = values["name"]Extract structured data using LLM providers:
llmClient := ollama.NewOllamaClient(baseURL)
extractionNode := llm.NewDataExtraction[MyState](llmClient, "model-name").
Extract("field1", llm.DTString).
Extract("field2", llm.DTInteger)
graph, _ := gb.
AddNode("extract", extractionNode).
Build()- Ollama: Local LLM inference
- Custom: Implement the
llm.Clientinterface
When an interrupt occurs, the workflow state is persisted. Resume execution with:
state, err := pipe.Continue(ctx, "thread-123", flodk.ResumeConfig{
InterruptValues: map[string]string{
"name": "John Doe",
},
})See the example/main.go for a complete flight booking workflow that demonstrates:
- Data extraction with LLM
- User input collection with validation
- Conditional routing
- State persistence and resumption
go get github.com/aki-kong/flodkLicensed under the Apache License, Version 2.0. See the LICENSE file for details.