A Python protocol runner and work-cell orchestrator for simulated lab automation. BenchBot models robotic liquid-handling β plates, wells, tip racks, transfers β runs protocols against a deterministic software simulation, and coordinates multiple heterogeneous instruments (liquid handler, incubator, plate reader) through dependency-ordered workflows with error recovery and graceful degradation. It's inspired by open-source lab-automation tooling such as PyLabRobot and PyHamilton, but is a self-contained simulator with no hardware required.
- Deterministic, seeded fault injection (engine milestone) makes hardware errors, retries, and recovery reproducible and testable.
- Event-sourced run logs persisted to SQLite β run state is derived from an immutable event stream, giving free replay and audit trails.
- Stateful virtual deck tracking per-well volumes and tip state, enabling validation a naive simulator can't do.
- Validate-only / dry-run mode separating "is this protocol legal?" from "run it."
- Two authoring paths β declarative YAML/JSON and a fluent Python builder β that compile to the same validated model.
- Multi-device orchestration with graceful degradation β a work cell runs dependency-ordered workflows across several instruments; when one device fails it is quarantined and its dependents skipped, while independent work continues (no cascading failure).
- Observability dashboard β a React/TypeScript UI lists persisted runs and
draws each workflow as a DAG with the failure path highlighted, alongside
device health and the event stream (see
web/).
| Concern | Choice |
|---|---|
| Packaging / venv | uv |
| Models & validation | Pydantic v2 |
| Lint + format | Ruff |
| Type checking | mypy (strict) |
| Tests | pytest + coverage |
| Persistence | SQLAlchemy 2.0 async + Alembic + aiosqlite |
| API | FastAPI + uvicorn |
| CLI | Typer |
uv sync # create the venv and install everything
uv run pytest # run the test suite with coverage
uv run ruff check . # lint
uv run mypy # type-check (strict)Validate a protocol programmatically:
from benchbot.domain import load_protocol_file, validate
protocol = load_protocol_file("examples/serial_dilution.yaml")
result = validate(protocol)
print("ok:", result.ok)
for issue in result.issues:
print(issue)Or build one fluently:
from benchbot.domain import ProtocolBuilder, validate
protocol = (
ProtocolBuilder("Serial dilution")
.add_plate("plate1", "plate_96_wellplate_200ul", slot=1)
.add_tiprack("tips1", "tiprack_300ul", slot=2)
.fill("plate1:A1", 200)
.transfer("plate1:A1", "plate1:A2", 100)
.mix("plate1:A2", 50, repeats=3)
.build()
)
assert validate(protocol).okRun a protocol through the simulator:
from benchbot.domain import load_protocol_file
from benchbot.engine import SimulationRunner
result = SimulationRunner().run(load_protocol_file("examples/serial_dilution.yaml"))
print(result.status.value) # "completed" | "failed" | "invalid"
for event in result.events:
print(event.seq, event.type)
print(result.final_state) # {"plate1:A1": 100.0, ...}A run has three terminal statuses: invalid (rejected by static validation, never
executed), failed (a dynamic error stopped it mid-run β see result.failure),
and completed. Every run produces an ordered event stream
(run_started, step_started, step_completed, step_warning, step_failed,
run_failed, run_completed) and a final deck snapshot.
Physical actions are routed through a mock serial instrument. Inject reproducible faults to exercise retry and recovery:
from benchbot.engine import SimulationRunner, RetryPolicy
from benchbot.instruments import MockSerialInstrument, RandomFaults
instrument = MockSerialInstrument(RandomFaults(seed=7, transient_rate=0.2, hard_rate=0.02))
runner = SimulationRunner(instrument, RetryPolicy(max_attempts=3))
result = runner.run(protocol) # same seed -> byte-for-byte same runThe instrument frames each command (>ASPIRATE vol=100 well=p:A1), returns
ACK/NAK, and raises transient (NAK), timeout, or fatal hardware faults per its
FaultPolicy. Transient/timeout faults are retried with exponential backoff
(RetryScheduled events); a hardware fault or exhausted retries emits
RecoveryFailed and aborts the run. Because faults come from a seeded RNG, a
given (seed, protocol) always produces the identical event stream β failures
are reproducible and unit-testable. Use ScriptedFaults([...]) for exact
control in tests, or NoFaults() (the default) for perfect hardware.
Runs are stored in SQLite as an append-only event stream; a run's status is derived from its events, not stored as independent mutable state. The schema is managed by Alembic migrations.
export BENCHBOT_DATABASE_URL="sqlite+aiosqlite:///benchbot.db"
uv run alembic upgrade head # create/upgrade the schemaimport asyncio
from benchbot.domain import load_protocol_file
from benchbot.engine import SimulationRunner
from benchbot.store import make_engine, make_session_factory, RunStore
async def main() -> None:
store = RunStore(make_session_factory(make_engine()))
protocol = load_protocol_file("examples/serial_dilution.yaml")
result = SimulationRunner().run(protocol)
run_id = await store.save_result(
result, protocol_name=protocol.metadata.name, total_steps=len(protocol.steps)
)
print(await store.get_run(run_id)) # cached status projection
print(await store.reconstruct_status(run_id)) # re-derived from the events
asyncio.run(main())Persistence uses SQLAlchemy 2.0 (async) with aiosqlite. The runs.status
column is a read-model projection of project_status(events); tests assert the
two always agree. Because the only coupling to SQLite is BENCHBOT_DATABASE_URL,
moving to Postgres is a one-line change.
The benchbot CLI wraps the same engine and store:
uv run benchbot validate examples/serial_dilution.yaml # static check
uv run benchbot run examples/serial_dilution.yaml # simulate + print events
uv run benchbot run examples/serial_dilution.yaml \
--seed 7 --transient-rate 0.3 --max-attempts 5 --save # faults + persist
uv run benchbot list # persisted runs
uv run benchbot show <run_id> # run summary
uv run benchbot events <run_id> # stored event stream
uv run benchbot workcell-demo # multi-device workflow demo
uv run benchbot workcell-demo --hard-rate 1.0 --seed 1 # ... with a failing device
uv run benchbot serve --port 8000 # launch the HTTP APIvalidate and run exit non-zero on invalid/failed runs, so they compose in
scripts and CI.
uv run benchbot serve # or: uv run uvicorn benchbot.api.app:create_app --factoryInteractive docs are served at /docs. Endpoints:
| Method & path | Purpose |
|---|---|
GET /health |
Liveness check. |
POST /protocols/validate |
Static-validate a protocol; returns issues. |
POST /runs |
Submit + simulate a protocol; returns a run summary. |
GET /runs |
List persisted runs (most recent first). |
GET /runs/{id} |
Run status + metadata. |
GET /runs/{id}/events |
The full event stream. |
GET /runs/{id}/diagnostics |
Command/retry/recovery counts + failure + warnings. |
POST /workflows |
Run a multi-device workflow; persists it and returns per-task outcomes. |
GET /workflows |
List persisted workflow runs (most recent first). |
GET /workflows/{id} |
A workflow run: status, the DAG definition, per-task outcomes, device health. |
GET /workflows/{id}/events |
The workflow's event stream. |
GET /workflows/{id}/export |
Download a self-contained run data package (JSON). |
GET /workcell/health |
Per-device status, error rates, and quarantine state. |
A POST /runs body can tune deterministic faults and retries:
The diagnostics response makes retry/recovery observable, e.g.
{"command_count": 5, "retry_count": 2, "recovery_failures": 0, ...}.
For live monitoring, GET /stream/demo is a Server-Sent Events stream of a
work-cell run (paced so a browser can animate it); the
dashboard consumes it.
A read-only React + TypeScript observability UI lives in web/. It
lists persisted workflow runs and, for any run, draws the workflow as a
directed graph β nodes colored by outcome with the failure path
highlighted β next to device health and the event stream. Drag the incubator
fault rate to 1.0 and run: the new run is degraded, the incubator node is
red, its dependent is skipped down a broken edge, and the independent task still
completes (graceful degradation, visible).
uv run benchbot serve --port 8000 # API
cd web && npm install && npm run dev # dashboard on http://localhost:5173Each run can be exported as a data package (GET /workflows/{id}/export, or
the download button in the inspector): a self-contained JSON with the run's
definition, per-task outcomes, device metrics derived from the event stream
(retries, errors, quarantine), and the full event stream β reproducible run
artifacts, not fabricated results.
Experiments are authored as code / YAML / API, not in the UI β the lab is agent- and code-driven, so the dashboard is purely a monitoring lens.
A protocol is a YAML/JSON document with four sections:
version: 1
metadata: { name: "Serial dilution", author: "you" }
labware:
- { id: plate1, type: plate_96_wellplate_200ul, slot: 1 }
- { id: tips1, type: tiprack_300ul, slot: 2 }
liquids:
- { well: "plate1:A1", volume_ul: 200 }
steps:
- transfer: { source: "plate1:A1", dest: "plate1:A2", volume_ul: 100, new_tip: true }
- mix: { well: "plate1:A2", volume_ul: 50, repeats: 3 }- Well references are
"<labware id>:<well address>", e.g.plate1:A1. - Steps accept the shorthand above or an explicit
{ type: transfer, ... }. - Step kinds:
transfer,aspirate,dispense,mix.
| Type | Geometry | Well capacity |
|---|---|---|
plate_96_wellplate_200ul |
8 Γ 12 | 200 Β΅L |
plate_384_wellplate_50ul |
16 Γ 24 | 50 Β΅L |
tiprack_300ul |
8 Γ 12 | 300 Β΅L |
tiprack_1000ul |
8 Γ 12 | 1000 Β΅L |
reservoir_12col_15ml |
1 Γ 12 | 15 000 Β΅L |
The simulated deck has slots 1β12; each slot holds one labware instance.
Validation never raises bare strings β every finding is an Issue with a stable
code, severity, optional step_index, and location.
| Code | Meaning |
|---|---|
E_DUP_LABWARE_ID |
Two labware share an id. |
E_UNKNOWN_LABWARE_TYPE |
Labware type is not in the registry. |
E_SLOT_OUT_OF_RANGE |
Slot is outside 1β12. |
E_SLOT_OCCUPIED |
Two labware placed on the same slot. |
E_BAD_WELL_REF |
Well reference is not labware:well. |
E_UNKNOWN_LABWARE_REF |
Well references a labware id that isn't placed. |
E_INVALID_WELL |
Well address doesn't exist for that labware's geometry. |
E_VOLUME_NOT_POSITIVE |
A volume is β€ 0. |
E_VOLUME_EXCEEDS_CAPACITY |
A volume exceeds the (smaller) well capacity. |
E_SAME_SOURCE_DEST |
Transfer source equals destination. |
E_NO_TIPRACK |
Protocol needs fresh tips but no tip rack is placed. |
See examples/invalid_protocol.yaml for a document that trips most of these.
These depend on live deck state and can only be caught while executing:
| Code | Severity | Meaning |
|---|---|---|
E_INSUFFICIENT_VOLUME |
error | Aspirated more than the well currently holds. |
E_OVERFILL |
error | A dispense pushed a well past its capacity. |
E_TIP_OVERFLOW |
error | Aspirated more than the mounted tip can hold. |
E_INSUFFICIENT_TIP_VOLUME |
error | Dispensed more than the tip is carrying. |
E_NO_TIP_AVAILABLE |
error | All tips on the deck have been used. |
E_NO_TIP_MOUNTED |
error | Aspirate/dispense attempted without a tip. |
W_TIP_CARRYOVER |
warning | A reused tip crossed wells; possible carryover. |
E_INSTRUMENT_NAK |
error | Instrument NAK'd after retries were exhausted. |
E_INSTRUMENT_TIMEOUT |
error | Instrument timed out after retries were exhausted. |
E_HARDWARE_FAILURE |
error | Fatal hardware fault (never retried). |
A single liquid handler is rarely the whole story β real assays span several
instruments. The work cell coordinates multiple devices behind one
abstraction and runs a workflow: a DAG of tasks, each targeting a device, with
depends_on edges for timing (e.g. read must run after incubate).
Devices and transports (all behind the same Instrument seam):
| Device | Kind | Transport (mock) | Tasks |
|---|---|---|---|
lh1 |
liquid handler | serial framing (>ASPIRATE β¦) |
run_protocol |
inc1 |
incubator | TCP/JSON | incubate |
reader1 |
plate reader | TCP/JSON | read_plate |
Three layers of failure handling, from narrow to broad:
- Command retry (per device) β transient NAK/timeout retried with backoff.
- Task recovery (
RecoveryPolicy) β when a task fails after retries, decide per failure code:RETRYthe task,SKIPit (quarantine the device, keep going), orHALTthe workflow. Default isSKIP. - Device quarantine β a
SKIP'd failure marks the deviceDOWN; its dependent tasks are skipped, but independent tasks keep running. One instrument failing never cascades through the cell.
from benchbot.workcell import WorkCell, Workflow, IncubateTask, ReadPlateTask, build_default_workcell
cell = build_default_workcell()
workflow = Workflow(name="assay", tasks=[
IncubateTask(id="incubate", device="inc1", minutes=30, celsius=37),
ReadPlateTask(id="read", device="reader1", plate="p", depends_on=["incubate"]),
])
result = cell.run_workflow(workflow)
print(result.status) # completed | degraded | halted | invalid
print(cell.health()) # per-device status + error ratesTry uv run benchbot workcell-demo --hard-rate 1.0 to watch the incubator fail,
get quarantined, its dependent get skipped, and the independent liquid-handler
task still complete (status degraded, not failed). Workflow validation has
its own codes: E_UNKNOWN_DEVICE, E_DEVICE_KIND_MISMATCH,
E_UNKNOWN_DEPENDENCY, E_DEPENDENCY_CYCLE, E_DUP_TASK_ID, E_SELF_DEPENDENCY.
The simulation is intentionally a faithful-but-bounded model. Explicit assumptions:
- A single deck with 12 slots; exactly one labware instance per slot.
- Single-channel pipetting β one well aspirated/dispensed at a time.
- One mounted tip at a time; a fresh tip starts empty. Reusing a tip across
different source wells is allowed but flagged (
W_TIP_CARRYOVER). - Volumes are in microliters; well geometry uses single-letter rows (AβZ).
- Liquids are tracked only by volume, not by species/concentration; there is no evaporation, mixing kinetics, or temperature.
- No physical timing or collision modeling β steps execute logically, not in wall-clock time. Instrument latency is abstracted into the fault policy.
- The mock instrument models the communication channel (frames, ACK/NAK, faults), not motor kinematics.
- The work cell executes tasks sequentially in dependency order β the focus is dependency ordering and failure isolation, not a real-time scheduler for overlapping device operations.
- Live work-cell state (device health, counters) is in-memory, but workflow runs are persisted to SQLite via the same event-sourced approach as single-device runs (the submitted DAG, per-task outcomes, device-health snapshot, and the event stream), so they can be listed and inspected later.
BenchBot is designed so every failure is reproducible. Static failures are deterministic by construction; runtime/hardware failures are deterministic given a seed.
| Scenario | How to reproduce |
|---|---|
| Static validation errors | uv run benchbot validate examples/invalid_protocol.yaml (exits 1, prints every E_* code). |
| Aspirate from an (under-filled) well | A transfer whose volume exceeds the source's current volume β E_INSUFFICIENT_VOLUME. |
| Overfill a destination | Transfer into a well already near capacity β E_OVERFILL. |
| Tip carryover warning | Reuse a tip across two source wells (new_tip: false) β W_TIP_CARRYOVER (run still completes). |
| Transient fault that recovers | uv run benchbot run examples/serial_dilution.yaml --seed 7 --transient-rate 0.3 β watch retry_scheduled events; run completes. |
| Unrecoverable hardware fault | uv run benchbot run examples/serial_dilution.yaml --seed 1 --hard-rate 1.0 β recovery_failed, exits 1. |
| Retries exhausted | --transient-rate 1.0 --max-attempts 2 β every attempt NAKs β E_INSTRUMENT_NAK. |
Because faults come from a seeded RNG, re-running any command with the same
--seed (and rates) reproduces the identical event stream β including over the
HTTP API via the faults/retry request fields.
docker compose up --build # builds, migrates, serves on :8000
curl localhost:8000/healthThe image installs dependencies from uv.lock (reproducible), runs
alembic upgrade head on startup, then serves via uvicorn. Run history is
persisted to a named volume (benchbot-data β /data/benchbot.db), so it
survives restarts. A HEALTHCHECK probes /health.
uv sync # install runtime + dev dependencies
uv run pytest # tests + coverage
uv run ruff check . && uv run ruff format --check . # lint + format
uv run mypy # strict type check
uv run alembic revision --autogenerate -m "msg" # new migration
uv run alembic upgrade head # apply migrations.github/workflows/ci.yml runs on every push/PR: ruff lint, ruff format check,
mypy (strict), the pytest suite, and an alembic upgrade head + alembic check
step that fails the build if the migrations ever drift from the ORM models.
src/benchbot/domain/ # pure models + validation (no I/O)
errors.py # Issue / ValidationResult / exceptions
labware.py # labware definitions, geometry, registry
protocol.py # protocol model + fluent builder
loader.py # YAML/JSON parsing
validation.py # static validation
src/benchbot/engine/ # stateful simulation (depends only on domain)
deck.py # virtual deck: well volumes, tips, pipette
events.py # run event types + in-memory event log
runner.py # step executor + dynamic validation + instrument I/O
retry.py # retry policy with exponential backoff
src/benchbot/instruments/ # the hardware seam (depends on domain)
base.py # Instrument interface, Command/Ack frames, error types
faults.py # deterministic fault policies (seeded / scripted)
mock_base.py # shared fault/ACK semantics for mock instruments
mock_serial.py # serial-framed instrument (liquid handler)
mock_tcp.py # TCP/JSON instrument (reader, incubator)
src/benchbot/workcell/ # multi-device orchestration (depends on engine)
devices.py # Device: instrument + kind + health + counters
workflow.py # Workflow DAG, tasks, validation, topological order
recovery.py # per-failure-mode recovery policy (retry/skip/halt)
cell.py # WorkCell: schedule, recover, quarantine, health
events.py # workflow event types + log
src/benchbot/store/ # persistence (depends on engine + domain)
models.py # SQLAlchemy ORM: runs + workflow_runs + event tables
db.py # async engine / session / URL config
repository.py # RunStore + WorkflowStore: save, load events, reconstruct
projections.py # derive run/workflow status from the event stream
src/benchbot/api/ # FastAPI service (thin adapter over engine + store)
app.py # application factory + lifespan-managed store
routes.py # endpoints
schemas.py # request/response models
src/benchbot/cli.py # Typer CLI (validate / run / list / show / events / serve)
migrations/ # Alembic migrations (async env, initial schema)
web/ # React + TypeScript live observability dashboard (SSE)
examples/ # sample protocols (one valid, one broken)
tests/ # pytest suite
Dockerfile # uv-based image: migrate then serve
docker-compose.yml # one-command stack with a persistent volume
docker/entrypoint.sh # alembic upgrade head + uvicorn
.github/workflows/ci.yml # ruff + mypy + pytest + migration drift check
MIT
{ "protocol": { "version": 1, "labware": [...], "liquids": [...], "steps": [...] }, "faults": { "seed": 7, "transient_rate": 0.3, "hard_rate": 0.02 }, "retry": { "max_attempts": 5 } }