Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ type Event struct {

// NodeInfo carries the per-event metadata identifying which node in
// a workflow activation emitted it.
//
// TODO(wolo): adk-python's NodeInfo also has OutputFor []string
// (fan-in re-attribution) and MessageAsOutput bool (content-as-output
// shorthand). Add as the corresponding features land in adk-go.
type NodeInfo struct {
// Path is the composite path of the emitting node within its
// workflow activation. Empty for top-level static nodes;
Expand All @@ -150,6 +146,18 @@ type NodeInfo struct {
// invariants to the emitter, allowing dynamic nodes to forward
// children's terminal events alongside their own.
Path string `json:"path,omitempty"`

// MessageAsOutput marks that this event's content IS the node's
// output: when set and Event.Output is nil, readers derive the
// node output from the event's model text. Mirrors adk-python's
// node_info.message_as_output.
MessageAsOutput bool `json:"messageAsOutput,omitempty"`

// OutputFor lists the node paths this event's Output counts for: the
// emitter plus any WithUseAsOutput delegating ancestors, so one event
// stands in for a whole delegation chain rather than each level
// re-emitting a duplicate. Mirrors adk-python's node_info.output_for.
OutputFor []string `json:"outputFor,omitempty"`
}

// RequestInput describes a single human-in-the-loop prompt emitted
Expand Down
44 changes: 39 additions & 5 deletions workflow/agent_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,40 @@ func (n *AgentNode) Run(ctx agent.InvocationContext, input any) iter.Seq2[*sessi

// synthesizeAgentOutput sets Event.Output from concatenated model
// text on final model responses so RunNode returns the agent's
// reply instead of the zero value.
// reply instead of the zero value. Empty model text yields an empty
// "" output (a value, not "no output"), matching adk-python and
// messageAsOutput; non-model events are left untouched.
//
// It also stamps NodeInfo.MessageAsOutput so readers (live and
// resume) know this event's output was derived from the model
// message, mirroring adk-python's process_llm_agent_output which
// sets event.output and node_info.message_as_output together.
func synthesizeAgentOutput(event *session.Event) {
if event == nil || event.Output != nil {
return
}
if !event.IsFinalResponse() {
return
}
if text, ok := messageText(event); ok {
event.Output = text
if event.NodeInfo == nil {
event.NodeInfo = &session.NodeInfo{}
}
event.NodeInfo.MessageAsOutput = true
}
}

// messageText concatenates the non-thought model text of an event. ok
// is false when the event carries no model content, distinguishing it
// from a model message with empty text.
func messageText(event *session.Event) (text string, ok bool) {
if event == nil {
return "", false
}
content := event.LLMResponse.Content
if content == nil || content.Role != "model" {
return
return "", false
}
var b []byte
for _, p := range content.Parts {
Expand All @@ -158,8 +181,19 @@ func synthesizeAgentOutput(event *session.Event) {
}
b = append(b, p.Text...)
}
if len(b) == 0 {
return
return string(b), true
}

// childEventOutput returns the output an event carries: its Output, or
// the model text when MessageAsOutput is set.
func childEventOutput(event *session.Event) (any, bool) {
if event.Output != nil {
return event.Output, true
}
if event.NodeInfo != nil && event.NodeInfo.MessageAsOutput {
if text, ok := messageText(event); ok {
return text, true
}
}
event.Output = string(b)
return nil, false
}
6 changes: 6 additions & 0 deletions workflow/agent_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,10 @@ func TestAgentNode_SynthesizesOutputFromModelText(t *testing.T) {
if got, want := gotFinal.Output, "Hello, world!"; got != want {
t.Errorf("final event Output = %v, want %q", got, want)
}
if gotFinal.NodeInfo == nil || !gotFinal.NodeInfo.MessageAsOutput {
t.Errorf("final event NodeInfo.MessageAsOutput = %v, want true", gotFinal.NodeInfo)
}
if gotPartial.NodeInfo != nil && gotPartial.NodeInfo.MessageAsOutput {
t.Errorf("partial event MessageAsOutput = true, want false/unset")
}
}
24 changes: 15 additions & 9 deletions workflow/dynamic_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (n *dynamicNode[IN, OUT]) Run(ctx agent.InvocationContext, input any) iter.

emit := makeEmit(yield, parentNC)
sub := newDynamicSubScheduler(parentNC, n.composePath(parentNC), emit)
orchestratorCtx := newDynamicNodeContext(parentNC, sub.parentPath, "", sub)
orchestratorCtx := newDynamicNodeContext(parentNC, sub.parentPath, "", sub, sub.outputForAncestors)

out, err := n.fn(orchestratorCtx, typedInput, emit)
if err != nil {
Expand All @@ -125,18 +125,21 @@ func (n *dynamicNode[IN, OUT]) Run(ctx agent.InvocationContext, input any) iter.
return
}

// A WithUseAsOutput child already emitted this output on its own
// event (stamped for this node), so emit no duplicate terminal
// event. Mirrors adk-python's _output_delegated.
if _, delegated := sub.delegatedOutput(); delegated {
return
}

// nil output: nothing to emit as a terminal event — the body
// either produced no output or already carried it on a content
// event.
if any(out) == nil {
return
}
ev := session.NewEvent(parentNC.InvocationID())
if delegated, ok := sub.delegatedOutput(); ok {
ev.Output = delegated
} else {
ev.Output = out
}
ev.Output = out
ev.NodeInfo = &session.NodeInfo{Path: sub.parentPath}
// TODO(wolo): validate ev.Output against n.outputSchema,
// mirroring function_node.go:87-92.
Expand All @@ -162,11 +165,14 @@ func (n *dynamicNode[IN, OUT]) coerceInput(input any) (IN, error) {
return typed, nil
}

// composePath returns this dynamic node's own composite path. Top-level
// activations get the bare Name(); nested dynamic nodes append.
// composePath returns this dynamic node's own composite path. When this
// node runs as a dynamic child, the scheduler already created its
// context with the full child path ("<parent>/<name>@<runID>"), so that
// path is used as-is. A top-level activation has no parent path and
// gets the bare Name().
func (n *dynamicNode[IN, OUT]) composePath(parent NodeContext) string {
if p := parent.Path(); p != "" {
return p + "/" + n.Name()
return p
}
return n.Name()
}
Expand Down
100 changes: 73 additions & 27 deletions workflow/dynamic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type dynamicSubScheduler struct {
parentCtx NodeContext
emitUp func(*session.Event) error

// outputForAncestors are the delegating-ancestor paths this
// activation's output also counts for, set when this dynamic node is
// itself a WithUseAsOutput child. Mirrors adk-python's
// Context._output_for_ancestors.
outputForAncestors []string

// mu guards everything below. Never held across child.Run.
mu sync.Mutex
// runCountByChild seeds the auto-counter per child name; the
Expand All @@ -44,10 +50,9 @@ type dynamicSubScheduler struct {
// outputDelegation is the at-most-one WithUseAsOutput delegation for a
// parent activation. claim is set eagerly on the first delegating child
// and never cleared within the activation (matching adk-python's
// _output_delegated); a second delegating child is rejected. A fresh
// sub-scheduler is built per activation, so there is nothing to reset
// across turns. hasValue is the source of truth for readability because
// nil is a valid delegated value.
// _output_delegated); a second delegating child is rejected. hasValue
// (not value != nil) is the source of truth, since nil is a valid
// delegated value.
//
// Methods require the enclosing scheduler's mu to be held.
type outputDelegation struct {
Expand Down Expand Up @@ -86,12 +91,17 @@ func (d *outputDelegation) output() (any, bool) {
}

func newDynamicSubScheduler(parent NodeContext, parentPath string, emitUp func(*session.Event) error) *dynamicSubScheduler {
var ancestors []string
if p, ok := parent.(*nodeContext); ok {
ancestors = p.outputForAncestors
}
s := &dynamicSubScheduler{
parentPath: parentPath,
parentCtx: parent,
emitUp: emitUp,
runCountByChild: map[string]int{},
resultByPath: map[string]any{},
parentPath: parentPath,
parentCtx: parent,
emitUp: emitUp,
outputForAncestors: ancestors,
runCountByChild: map[string]int{},
resultByPath: map[string]any{},
}
s.rehydrateCache()
return s
Expand Down Expand Up @@ -156,7 +166,13 @@ func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions
}

childBranch := deriveChildBranch(s.parentCtx.Branch(), name, runID, opts.useSubBranch, opts.overrideBranch)
childCtx := newDynamicNodeContext(s.parentCtx.WithBranch(childBranch), childPath, runID, s)
// A delegating child extends the chain: its own delegating children
// must count their output for this parent and its ancestors too.
var childAncestors []string
if opts.useAsOutput {
childAncestors = append([]string{s.parentPath}, s.outputForAncestors...)
}
childCtx := newDynamicNodeContext(s.parentCtx.WithBranch(childBranch), childPath, runID, s, childAncestors)

// EXPERIMENTAL: stash childCtx (a *nodeContext with non-nil
// subScheduler) in the embedded context.Context so tools running
Expand All @@ -170,6 +186,7 @@ func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions

var (
out any
hasOutput bool
interrupted bool
)
for ev, evErr := range child.Run(childCtx, input) {
Expand All @@ -186,23 +203,33 @@ func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions
// Stamp NodeInfo.Path so the top scheduler scopes the
// child's Output/Routes to the child (not the parent's
// accumulator). RequestedInput is promoted to the parent —
// see scheduler.handleEvent. Skip if the child already
// stamped NodeInfo (nested dynamic node yielding its own
// terminal event, dynamic_node.go).
// see scheduler.handleEvent. A child may set NodeInfo without
// a Path (e.g. MessageAsOutput), so fill the Path when empty
// rather than only when NodeInfo is nil; a nested dynamic node
// that already set its own Path keeps it.
if ev.NodeInfo == nil {
ev.NodeInfo = &session.NodeInfo{Path: childPath}
} else if ev.NodeInfo.Path == "" {
ev.NodeInfo.Path = childPath
}
if ev.RequestedInput != nil {
interrupted = true
}
if ev.Output != nil {
out = ev.Output
// A delegated child's output is re-emitted by the
// parent's terminal event; drop it here to avoid a
// duplicate. Partial/state-only events (Output ==
// nil) still propagate.
if opts.useAsOutput {
continue
if childOut, ok := childEventOutput(ev); ok {
out = childOut
hasOutput = true
// Stamp OutputFor so resume can attribute the output: the
// emitter's own path plus, under delegation, this parent and
// its ancestors (the parent then suppresses its own terminal
// event). Mirrors adk-python _enrich_event. A nested child
// that already stamped its chain keeps it.
if ev.NodeInfo.OutputFor == nil {
outputFor := []string{ev.NodeInfo.Path}
if opts.useAsOutput {
outputFor = append(outputFor, s.parentPath)
outputFor = append(outputFor, s.outputForAncestors...)
}
ev.NodeInfo.OutputFor = outputFor
}
}
if err := s.emitUp(ev); err != nil {
Expand All @@ -213,20 +240,39 @@ func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions
}
}

// HITL: not cached, so resume re-runs and re-invokes RunNode.
if interrupted {
// HITL is not terminal — parent re-runs on resume and is
// expected to re-invoke RunNode. Do not cache.
return nil, &NodeRunError{
ChildName: name, ChildPath: childPath, RunID: runID,
Cause: ErrNodeInterrupted,
}
return nil, s.pause(name, childPath, runID)
}

// WaitForOutput child with no output is not done yet; pause so the
// parent retries. Mirrors adk-python ctx.run_node(raise_on_wait=True).
if !hasOutput && waitsForOutput(child) {
return nil, s.pause(name, childPath, runID)
}

s.storeCachedOutput(childPath, out)
s.commitDelegation(childPath, out) // no-op unless this child claimed the delegation
return out, nil
}

// pause reports that a child did not finish this turn so the parent must
// re-run later. ErrNodeInterrupted is a control sentinel (not a failure),
// swallowed by dynamicNode.Run.
func (s *dynamicSubScheduler) pause(name, childPath, runID string) error {
return &NodeRunError{
ChildName: name, ChildPath: childPath, RunID: runID,
Cause: ErrNodeInterrupted,
}
}

// waitsForOutput reports whether node opts into WaitForOutput (tri-state
// pointer; nil means the engine default of false).
func waitsForOutput(node Node) bool {
w := node.Config().WaitForOutput
return w != nil && *w
}

func (s *dynamicSubScheduler) lookupCachedOutput(childPath string) (any, bool) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
Loading