Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1d44536
feat: ensure durability on durable agent no matter startup means
sicoyle Sep 4, 2025
0be55cd
Merge branch 'main' into fix-da-durability
sicoyle Sep 4, 2025
f966a12
style: clean up and ensure proper agent wf state
sicoyle Sep 4, 2025
be67f71
fix: ensure telemetry works with this and clean up
sicoyle Sep 5, 2025
9ffbc5a
fix: correct exceptions i saw in debugger
sicoyle Sep 5, 2025
728be55
fix: resume workflows only if same input
sicoyle Sep 8, 2025
8601054
style: appease linter
sicoyle Sep 8, 2025
793506d
fix(tests): update field for tests
sicoyle Sep 8, 2025
aa613f8
use dapr client in python sdk for LLM
filintod Sep 8, 2025
18bee21
lint
filintod Sep 8, 2025
0295c42
Update dependencies to Dapr 1.16.0-rc.2 and refactor DaprChatClient f…
filintod Sep 15, 2025
a4f2e9b
change requirements to point to dapr-agents instead of local
filintod Sep 15, 2025
107374b
fix: push last fixes on tracing for resumed wfs
sicoyle Sep 15, 2025
ee52de0
style: make linter happy
sicoyle Sep 15, 2025
6d17c87
fix: last ditch effort to make the resumed wf tracing better
sicoyle Sep 16, 2025
8315669
style: apepase linter
sicoyle Sep 16, 2025
c08f393
fix: add agent span that does need fixing in future
sicoyle Sep 16, 2025
336e65b
style: linting for flake8
sicoyle Sep 16, 2025
590c3c1
Update quickstarts/03-durable-agent-multitool-dapr/README.md
filintod Sep 16, 2025
c5874af
Apply suggestions from code review from Sam
filintod Sep 16, 2025
373d73a
fix: make tests happy
sicoyle Sep 16, 2025
87ff4f0
updates per feedback. fix issues with rc install and version in pypro…
filintod Sep 16, 2025
e771bc4
Merge branch 'main' into filinto/add-dapr-client-tool-calling
filintod Sep 16, 2025
3d3d838
Merge branch 'main' into fix-da-durability
yaron2 Sep 16, 2025
3ceeb36
new line tox
filintod Sep 16, 2025
e5c4698
lint
filintod Sep 16, 2025
06b790b
fix: updates for quickstarts
sicoyle Sep 16, 2025
dd4db0e
style: tox -e flake8
sicoyle Sep 16, 2025
5fe54cb
fix: make tox -e type happy
sicoyle Sep 16, 2025
02e53f6
fix: make linter hapy again
sicoyle Sep 16, 2025
588e0bf
fix: make tests happy
sicoyle Sep 16, 2025
344d4e3
Merge branch 'main' into filinto/add-dapr-client-tool-calling
filintod Sep 17, 2025
e7da22c
try to fix tox test issue with imports, fix error introduced after ch…
filintod Sep 17, 2025
08e8e1a
Enhance README with component configuration details for Dapr and upda…
filintod Sep 17, 2025
cc2eaa6
minor refactor to quickstart to show where we initialize Dapr as the …
filintod Sep 17, 2025
35f77ce
style: add comments / notes to self
sicoyle Sep 17, 2025
1a88ed3
Merge remote-tracking branch 'filintod/filinto/add-dapr-client-tool-c…
sicoyle Sep 17, 2025
f38d1fe
fix: fix quickstarts as i tested the rest
sicoyle Sep 17, 2025
1c71828
style: tox -e ruff
sicoyle Sep 17, 2025
66f45b6
fix: address conflicts
sicoyle Sep 17, 2025
f5098fc
feat: bump all to v0.8.4
sicoyle Sep 17, 2025
d6de4fd
fix: allow latest release to be the default
sicoyle Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dapr_agents/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ class AgentBase(BaseModel, ABC):
max_iterations: int = Field(
default=10, description="Max iterations for conversation cycles."
)
# TODO(@Sicoyle): Rename this to make clearer
memory: MemoryBase = Field(
default_factory=ConversationListMemory,
description="Handles conversation history and context storage.",
description="Handles long-term conversation history (for all workflow instance-ids within the same session) and context storage.",
)
# TODO: we should have a system_template, prompt_template, and response_template, or better separation here.
# If we have something like a customer service agent, we want diff templates for different types of interactions.
Expand Down
385 changes: 315 additions & 70 deletions dapr_agents/agents/durableagent/agent.py

Large diffs are not rendered by default.

20 changes: 16 additions & 4 deletions dapr_agents/agents/durableagent/state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from typing import List, Optional, Dict, Any
from dapr_agents.types import MessageContent, ToolExecutionRecord
from datetime import datetime
import uuid
Expand Down Expand Up @@ -30,7 +30,7 @@ class DurableAgentWorkflowEntry(BaseModel):
description="Timestamp when the workflow was started",
)
end_time: Optional[datetime] = Field(
default_factory=datetime.now,
default=None,
description="Timestamp when the workflow was completed or failed",
)
messages: List[DurableAgentMessage] = Field(
Expand All @@ -44,9 +44,21 @@ class DurableAgentWorkflowEntry(BaseModel):
default_factory=list, description="Tool message exchanged during the workflow"
)
source: Optional[str] = Field(None, description="Entity that initiated the task.")
source_workflow_instance_id: Optional[str] = Field(
workflow_instance_id: Optional[str] = Field(
default=None,
description="The agent's own workflow instance ID.",
)
triggering_workflow_instance_id: Optional[str] = Field(
default=None,
description="The workflow instance ID of the entity that triggered this agent (for multi-agent communication).",
)
workflow_name: Optional[str] = Field(
default=None,
description="The name of the workflow.",
)
trace_context: Optional[Dict[str, Any]] = Field(
default=None,
description="The workflow instance ID associated with the original request.",
description="OpenTelemetry trace context for workflow resumption.",
)


Expand Down
9 changes: 7 additions & 2 deletions dapr_agents/llm/openai/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,10 @@ def generate(
stream=stream,
)
except Exception as e:
logger.error("ChatCompletion API error", exc_info=True)
raise ValueError("Failed to process chat completion") from e
error_type = type(e).__name__
error_msg = str(e)

logger.error(f"OpenAI ChatCompletion API error: {error_type} - {error_msg}")
logger.error("Full error details:", exc_info=True)

raise ValueError(f"OpenAI API error ({error_type}): {error_msg}") from e
10 changes: 9 additions & 1 deletion dapr_agents/observability/context_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,19 @@ def create_child_span_with_context(
Span context manager that can be used in 'with' statements for proper
span lifecycle management with restored parent-child relationships
"""
# Try to restore context from W3C format first
parent_ctx = restore_otel_context(otel_context)

if parent_ctx:
return tracer.start_as_current_span(
span_name, context=parent_ctx, attributes=attributes
)
else:
return tracer.start_as_current_span(span_name, attributes=attributes)
# Fallback: try to use current active span as parent
current_span = trace.get_current_span()
if current_span and current_span.is_recording():
# Use current span as parent by creating a child span
return tracer.start_as_current_span(span_name, attributes=attributes)
else:
# Last resort: create root span
return tracer.start_as_current_span(span_name, attributes=attributes)
86 changes: 86 additions & 0 deletions dapr_agents/observability/context_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,78 @@ def get_context(self, instance_id: str) -> Optional[Dict[str, Any]]:
logger.warning(f"⚠️ No context found for instance {instance_id}")
return context

def create_resumed_workflow_context(
self,
instance_id: str,
agent_name: Optional[str] = None,
stored_trace_context: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Create a new trace context for a resumed workflow after app restart.

When an app restarts, the in-memory context storage is lost. This method
creates a new trace context for resumed workflows so they can still be
traced, even though they won't be connected to the original trace.

Args:
instance_id (str): Unique workflow instance ID

Returns:
Dict[str, Any]: New W3C context data for the resumed workflow
"""
try:
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
)

# Create a new trace for the resumed workflow with proper AGENT span
tracer = trace.get_tracer(__name__)

# Create AGENT span with proper agent name for resumed workflow
agent_display_name = agent_name or "DurableAgent"
span_name = f"{agent_display_name}.ToolCallingWorkflow"
with tracer.start_as_current_span(span_name) as span:
# Set AGENT span attributes
from .constants import OPENINFERENCE_SPAN_KIND

span.set_attribute(OPENINFERENCE_SPAN_KIND, "AGENT")
span.set_attribute("workflow.instance_id", instance_id)
span.set_attribute("workflow.resumed", True)
span.set_attribute("agent.name", agent_name)
# Extract the new context
propagator = TraceContextTextMapPropagator()
carrier = {}
propagator.inject(carrier)

context_data = {
"traceparent": carrier.get("traceparent"),
"tracestate": carrier.get("tracestate"),
"instance_id": instance_id,
"resumed": True,
"debug_info": f"New trace created for resumed workflow {instance_id}",
}

# Store the new context
self.store_context(instance_id, context_data)
logger.info(
f"Created new trace context for resumed workflow {instance_id}"
)

return context_data

except Exception as e:
logger.error(
f"Failed to create resumed workflow context for {instance_id}: {e}"
)
return {
"traceparent": None,
"tracestate": None,
"instance_id": instance_id,
"resumed": True,
"error": str(e),
}

def cleanup_context(self, instance_id: str) -> None:
"""
Clean up stored context for a completed workflow instance to prevent memory leaks.
Expand Down Expand Up @@ -168,6 +240,20 @@ def get_workflow_context(instance_id: str) -> Optional[Dict[str, Any]]:
return _context_storage.get_context(instance_id)


def get_all_workflow_contexts() -> Dict[str, Dict[str, Any]]:
"""
Retrieve all stored OpenTelemetry contexts from the global storage.

Used for debugging and fallback context lookup when instance-specific
context retrieval fails due to timing issues.

Returns:
Dict[str, Dict[str, Any]]: All stored contexts keyed by instance_id/key
"""
with _context_storage._lock:
return dict(_context_storage._storage)


def cleanup_workflow_context(instance_id: str) -> None:
"""
Clean up stored context for a completed workflow instance using the global storage.
Expand Down
Loading