Durable execution for long-running agent workflows, on top of Temporal.
You write a Python workflow that calls models, runs tools, and writes artifacts. sagaflow runs each step as a Temporal activity, so when the worker dies — or a 40-minute fan-out crashes halfway — the next launch resumes from the last completed step instead of starting over. Results land in ~/.sagaflow/INBOX.md whether or not you're still attached to the session that started them.
pip install sagaflow
temporal server start-dev &
export ANTHROPIC_API_KEY=sk-ant-...
sagaflow launch hello-world --name alice --await
# → hello, aliceKill the terminal mid-run and re-launch the same workflow ID: it picks up from where the worker died.
- Resumes after crashes. Activity-level checkpointing via Temporal — workers, sessions, and laptops can all die without losing in-flight work.
- Decoupled from the caller. Fire-and-forget submissions land in an append-only inbox; a session-start hook surfaces unread results next time you open Claude Code.
- Provider-agnostic transport. Anthropic SDK by default; point
ANTHROPIC_BASE_URLat Bedrock, a model gateway, or any compatible proxy. - Auto-managed worker. First
sagaflow launchspawns a worker daemon;sagaflow doctorreports health.
pip install sagaflowRequirements:
- Python 3.11+
- Temporal CLI running locally:
brew install temporal && temporal server start-dev - An Anthropic API key (or a compatible proxy via
ANTHROPIC_BASE_URL)
A skill is a directory under ~/.claude/skills/<skill-name>/ containing three things:
workflow.py— a Temporal workflow class (the durable orchestration)__init__.py— aregister()function that wires the workflow into sagaflowprompts/*.md— the system/user prompts the workflow's activities load
Here is the complete hello-world skill (the one sagaflow launch hello-world --name alice runs).
~/.claude/skills/hello-world/workflow.py — the durable workflow:
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from sagaflow.durable.activities import (
EmitFindingInput, SpawnSubagentInput, WriteArtifactInput,
)
from sagaflow.durable.retry_policies import HAIKU_POLICY
@dataclass(frozen=True)
class HelloWorldInput:
run_id: str
name: str
inbox_path: str
run_dir: str
greeter_system_prompt: str
greeter_user_prompt: str
@workflow.defn(name="HelloWorldWorkflow")
class HelloWorldWorkflow:
@workflow.run
async def run(self, inp: HelloWorldInput) -> str:
prompt_path = f"{inp.run_dir}/prompt.txt"
await workflow.execute_activity(
"write_artifact",
WriteArtifactInput(path=prompt_path, content=inp.greeter_user_prompt),
start_to_close_timeout=timedelta(seconds=10),
retry_policy=HAIKU_POLICY,
)
parsed = await workflow.execute_activity(
"spawn_subagent",
SpawnSubagentInput(
role="greeter", tier_name="HAIKU",
system_prompt=inp.greeter_system_prompt,
user_prompt_path=prompt_path,
max_tokens=64, tools_needed=False,
),
start_to_close_timeout=timedelta(seconds=600),
heartbeat_timeout=timedelta(seconds=120),
retry_policy=HAIKU_POLICY,
)
greeting = parsed.get("GREETING", "hello")
await workflow.execute_activity(
"emit_finding",
EmitFindingInput(
inbox_path=inp.inbox_path, run_id=inp.run_id,
skill="hello-world", status="DONE", summary=greeting,
timestamp_iso=workflow.now().isoformat(timespec="seconds"),
),
start_to_close_timeout=timedelta(seconds=10),
retry_policy=HAIKU_POLICY,
)
return greetingEach execute_activity call is a checkpoint. If the worker dies between them, replay resumes from the last completed one.
~/.claude/skills/hello-world/__init__.py — registration:
from typing import Any
from sagaflow.durable.activities import emit_finding, spawn_subagent, write_artifact
from sagaflow.prompts import load_prompt
from sagaflow.registry import SkillRegistry, SkillSpec
from skills.hello_world.workflow import HelloWorldInput, HelloWorldWorkflow
def _build_input(*, run_id, run_dir, inbox_path, cli_args: dict[str, Any]) -> HelloWorldInput:
name = str(cli_args.get("name", "world"))
return HelloWorldInput(
run_id=run_id, name=name,
inbox_path=inbox_path, run_dir=run_dir,
greeter_system_prompt=load_prompt(__file__, "greeter.system"),
greeter_user_prompt=load_prompt(__file__, "greeter.user", substitutions={"name": name}),
)
def register(registry: SkillRegistry) -> None:
registry.register(SkillSpec(
name="hello-world",
workflow_cls=HelloWorldWorkflow,
activities=[write_artifact, emit_finding, spawn_subagent],
build_input=_build_input,
))register() is what the worker calls at startup to discover the skill. _build_input translates CLI args (--name alice) into the workflow's input dataclass and loads prompts from disk.
~/.claude/skills/hello-world/prompts/greeter.system.md:
You are a greeter. Output a greeting using the format
STRUCTURED_OUTPUT_START / GREETING|<text> / STRUCTURED_OUTPUT_END.
Do not include any other text.
~/.claude/skills/hello-world/prompts/greeter.user.md:
Greet $name
That's the whole skill. sagaflow launch hello-world --name alice finds the registration, builds the input, hands the workflow to Temporal, and the worker runs it durably.
sagaflow launch <name> --arg key=value [--await] # submit a workflow
sagaflow inbox # list unread results
sagaflow dismiss <run-id> # mark as read
sagaflow doctor # diagnose temporal/worker/hooksagaflow launch
│
▼
preflight (install hook, spawn worker if missing)
│
▼
Temporal (localhost:7233) ── workflow ID ── worker daemon
│
▼
activities:
• model calls
• file I/O
• inbox emit
│
▼
~/.sagaflow/INBOX.md + desktop notify
│
▼
next session: SessionStart
hook surfaces unread runs
If the worker crashes mid-run, the next sagaflow launch (or the next worker poll) resumes from the last completed activity. Activities that already succeeded don't re-execute.
git clone https://github.com/npow/sagaflow
cd sagaflow
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
ruff check sagaflow tests
mypy sagaflow
pytest
# Opt-in end-to-end tests (require live Temporal + real Anthropic access)
SAGAFLOW_E2E=1 pytest