Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions internal/telemetry/functionaltest/workflow_telemetry_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,23 @@ import (
"google.golang.org/adk/internal/telemetry"
"google.golang.org/adk/internal/telemetry/telemetrytest"
"google.golang.org/adk/internal/telemetry/telemetrytestcase"
"google.golang.org/adk/session"
"google.golang.org/adk/workflow"
)

// TestTelemetrySchema_WorkflowChain runs the
// [telemetrytestcase.WorkflowChainCase] scenario end-to-end and
// asserts the emitted span tree matches the expected shape. No LLM
// is involved so no log records are emitted; the test still
// TestTelemetrySchema_WorkflowDynamic runs the
// [telemetrytestcase.WorkflowDynamicCase] scenario end-to-end and
// asserts the emitted span tree matches the expected shape.
// No LLM is involved so no log records are emitted; the test still
// installs an in-memory log exporter so a regression that suddenly
// starts emitting events would be caught by the cmp.Diff against
// the empty-Logs expectation.
// starts emitting events would be caught by the cmp.Diff against the
// empty-Logs expectation.
//
// Hermetic: no network calls, no LLMs, no GCP.
//
// Mirrors test_telemetry_schema in
// adk-python/tests/unittests/telemetry/test_node_functional.py.
func TestTelemetrySchema_WorkflowChain(t *testing.T) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have some more tests with error handling

  1. e.g. upper_node returning error.
  2. check that ErrNodeInterrupted is properly handled

func TestTelemetrySchema_Workflow(t *testing.T) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have two workflow types: static and dynamic. I think we should cover both of them, so just add a new test instead of replacing it.

spanExp := tracetest.NewInMemoryExporter()
telemetry.OverrideTracerForTesting(t, sdktrace.NewTracerProvider(sdktrace.WithSyncer(spanExp)))

Expand All @@ -63,10 +64,24 @@ func TestTelemetrySchema_WorkflowChain(t *testing.T) {
upperNode := workflow.NewFunctionNode("upper_node", upperFn, nodeCfg)
suffixNode := workflow.NewFunctionNode("suffix_node", suffixFn, nodeCfg)

// routerNode is a dynamic orchestrator: it expresses its execution
// order as Go code, delegating to the two function nodes via
// workflow.RunNode rather than wiring them as static graph edges.
routerNode := workflow.NewDynamicNode("router_node",
func(ctx workflow.NodeContext, in string, _ func(*session.Event) error) (string, error) {
upper, err := workflow.RunNode[string](ctx, upperNode, in)
if err != nil {
return "", err
}
return workflow.RunNode[string](ctx, suffixNode, upper)
},
nodeCfg,
)

wfAgent, err := workflowagent.New(workflowagent.Config{
Name: "my_workflow",
Description: "uppercases input then appends a suffix",
Edges: workflow.Chain(workflow.Start, upperNode, suffixNode),
Description: "dynamic orchestrator delegating to two function nodes",
Edges: workflow.Chain(workflow.Start, routerNode),
})
if err != nil {
t.Fatalf("workflowagent.New: %v", err)
Expand All @@ -75,7 +90,7 @@ func TestTelemetrySchema_WorkflowChain(t *testing.T) {
telemetrytest.RunScenario(t, wfAgent, "hello")

got := telemetrytest.BuildDigests(t, spanExp.GetSpans(), logExp.Records())
if diff := cmp.Diff(telemetrytestcase.WorkflowChainCase, got); diff != "" {
if diff := cmp.Diff(telemetrytestcase.WorkflowDynamicCase, got); diff != "" {
t.Errorf("telemetry mismatch (-want +got):\n%s", diff)
}
}
48 changes: 0 additions & 48 deletions internal/telemetry/telemetrytestcase/workflow_chain.go

This file was deleted.

66 changes: 66 additions & 0 deletions internal/telemetry/telemetrytestcase/workflow_dynamic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrytestcase

import (
"google.golang.org/adk/internal/telemetry/telemetrytest"
)

// WorkflowDynamicCase is the expected root span for a workflowagent
// whose single graph node is a dynamic orchestrator that delegates
// to two FunctionNode children inline via workflow.RunNode.
//
// The shape asserts that RunNode-driven delegation is instrumented:
// each delegated child emits its own "invoke_node" span nested under
// the dynamic node's span (rather than appearing flat under the
// workflow or, as before instrumentation, not at all). The dynamic
// node itself is still activated by the top scheduler, so it nests
// directly under the workflow span.
var WorkflowDynamicCase = &telemetrytest.SpanDigest{
Name: "invoke_workflow my_workflow",
Attributes: map[string]any{
"gen_ai.operation.name": "invoke_workflow",
"gen_ai.workflow.name": "my_workflow",
"gen_ai.conversation.id": telemetrytest.PRESENT,
},
Children: []*telemetrytest.SpanDigest{
{
Name: "invoke_node router_node",
Attributes: map[string]any{
"gen_ai.operation.name": "invoke_node",
"gen_ai.node.name": "router_node",
"gen_ai.conversation.id": telemetrytest.PRESENT,
},
Children: []*telemetrytest.SpanDigest{
{
Name: "invoke_node upper_node",
Attributes: map[string]any{
"gen_ai.operation.name": "invoke_node",
"gen_ai.node.name": "upper_node",
"gen_ai.conversation.id": telemetrytest.PRESENT,
},
},
{
Name: "invoke_node suffix_node",
Attributes: map[string]any{
"gen_ai.operation.name": "invoke_node",
"gen_ai.node.name": "suffix_node",
"gen_ai.conversation.id": telemetrytest.PRESENT,
},
},
},
},
},
}
25 changes: 24 additions & 1 deletion workflow/dynamic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package workflow

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"

"go.opentelemetry.io/otel/codes"

"google.golang.org/adk/session"
)

Expand Down Expand Up @@ -59,7 +63,7 @@ func newDynamicSubScheduler(parent NodeContext, parentPath string, emitUp func(*
//
// Session, invocation metadata, and cancellation come from
// s.parentCtx. opts carries the resolved RunNodeOption arguments.
func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions) (any, error) {
func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions) (result any, err error) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: result name is used but in the code we have return **out**, nil. It would be a bit more consistent if we use out or result in both places: in the function signature and in the body

name := child.Name()
runID, err := s.resolveRunID(name, opts.customRunID)
if err != nil {
Expand All @@ -74,6 +78,25 @@ func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions
childBranch := deriveChildBranch(s.parentCtx.Branch(), name, runID, opts.useSubBranch, opts.overrideBranch)
childCtx := newDynamicNodeContext(s.parentCtx.WithBranch(childBranch), childPath, runID, s)

// Emit an "invoke_node <name>" span nested under the dynamic
// node's span (carried in s.parentCtx), so RunNode-driven
// children appear in the trace tree. startNodeSpan returns a
// *nodeContext because childCtx is one.
span, spanCtx := startNodeSpan(childCtx, child)
defer span.End()
childCtx = spanCtx.(*nodeContext)

// Record genuine runtime failures on the span. HITL pauses
// (ErrNodeInterrupted) and parent cancellation (context.Canceled)
// are expected control flow, not span errors — matching the top
// scheduler's runNode, which skips context.Canceled.
defer func() {
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrNodeInterrupted) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}()

// EXPERIMENTAL: stash childCtx (a *nodeContext with non-nil
// subScheduler) in the embedded context.Context so tools running
// inside an LlmAgent that is itself running as this dynamic
Expand Down
Loading