feat(workflow): add NewEmittingFunctionNode for streaming and HITL function node#1000
Open
wolo-lab wants to merge 9 commits into
Open
feat(workflow): add NewEmittingFunctionNode for streaming and HITL function node#1000wolo-lab wants to merge 9 commits into
wolo-lab wants to merge 9 commits into
Conversation
846ada6 to
5e6faa8
Compare
0233ecc to
93951e7
Compare
…on nodes FunctionNode previously emitted exactly one event with the function's return value, leaving HITL prompts and progress streaming impossible from a plain function body — users had to drop down to a custom Node or wrap logic in an AgentNode. This mirrors the gap with adk-python, where a generator-style function node can yield RequestInput, Content, or Event objects mid-execution. Add NewEmittingFunctionNode and NewEmittingFunctionNodeWithSchema that accept an EmittingFunctionFn[IN, OUT] whose body receives an emit callback. The shape and HITL contract mirror DynamicFn from dynamic_node.go: the body may emit any number of intermediate events (typically via NewRequestInputEvent for HITL), and returns ErrNodeInterrupted as the sentinel meaning "pause event already forwarded; do not emit a terminal event". Any other returned error fails the node — matching the precedence already tested for bare nodes in TestScheduler_HitlNode_ErrorAfterRequestFails. A nil return without error suppresses the auto-emitted terminal event for nodes whose entire payload was sent inline. No scheduler, resume, or persistence changes are needed: the emit helper and ErrNodeInterrupted sentinel already power dynamicNode, so the existing pause/handoff/re-entry machinery applies unchanged. Default NodeConfig.RerunOnResume keeps handoff semantics consistent with adk-python's rerun_on_resume=False default in _function_node.py. The classic NewFunctionNode constructor and signature are unchanged. Tests cover the same scenarios as hitl_test.go (pause+forward, auto-generated InterruptID, multiple parked requests, error precedence) plus a non-HITL streaming case (intermediate content event alongside the terminal output). Simplify the workflow examples that previously hand-rolled a workflow.BaseNode subclass solely to emit a single custom event: hitl_simple and dynamic/hitl (RequestInput for HITL), and all three routing examples int/string/llm (Event.Routes). All now use NewEmittingFunctionNode.
93951e7 to
c3a77c3
Compare
…node # Conflicts: # workflow/function_node.go
| yield(nil, err) | ||
| return | ||
| } | ||
| if output == nil { |
Collaborator
There was a problem hiding this comment.
For a no-op function there is no trace of completion. To be considered: emit an event with invocation id and no content
Author
There was a problem hiding this comment.
Valid point. It makes sense. I've updated it for dynamic_nodes as well.
Author
There was a problem hiding this comment.
I will handle it in a follow-up PR as it is a bigger change.
e2681cf to
1b4f4e9
Compare
…node # Conflicts: # examples/workflow/dynamic/hitl/main.go # examples/workflow/hitl_simple/main.go # examples/workflow/routing/int/main.go # examples/workflow/routing/llm/main.go # examples/workflow/routing/string/main.go # workflow/function_node.go
examples/quickstart/main was a 35MB ELF build artifact tracked in the repo (alongside its main.go source). Remove it; the source is unaffected.
…ode API The emitting FunctionNode API (EmittingFunctionFn, the emittingFn field, and the internal wrapper) used the NodeContext alias; switch it to agent.Context directly. Updates the NewEmittingFunctionNode callers in tests and examples to match. DynamicFn keeps NodeContext (out of scope).
The emitting function node callbacks now take agent.Context, so the nc (node context) parameter name no longer fits; rename to ctx. DynamicFn callbacks keep nc (still NodeContext).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
A
FunctionNodeemits exactly one event built from its return value, soa plain function body cannot emit a HITL
RequestInput, a routing event(
Event.Routes), or progress updates. Authors who needed any of these hadto drop down to a custom
workflow.Node(BaseNodesubclass) just to callyield. adk-python supports this directly via generator-style functionnodes; adk-go had no equivalent.
Solution
Add
NewEmittingFunctionNode/NewEmittingFunctionNodeWithSchematakingan
EmittingFunctionFn[IN, OUT]whose body receives anemitcallback. Theshape and HITL contract mirror
DynamicFn: emit any number of intermediateevents, and return
ErrNodeInterruptedafter aRequestInputto pause. Noscheduler/resume/persistence changes are needed — the existing
pause/handoff machinery applies unchanged.
NewFunctionNodeis untouched.Simplifies five workflow examples (hitl_simple, dynamic/hitl, routing
int/string/llm) that previously hand-rolled a
BaseNodesubclass solely toemit a single custom event.