#orchestration #workflow-automation #planning #htn #workflow #automation

mahler

An automated job orchestration library that builds and executes dynamic workflows

40 releases (10 breaking)

0.25.12 May 5, 2026
0.25.10 Mar 23, 2026
0.25.5 Feb 23, 2026
0.23.0 Dec 22, 2025
0.18.0 Jul 29, 2025

#955 in Asynchronous

Download history 506/week @ 2026-01-18 157/week @ 2026-01-25 180/week @ 2026-02-01 324/week @ 2026-02-08 477/week @ 2026-02-15 411/week @ 2026-02-22 331/week @ 2026-03-01 440/week @ 2026-03-08 924/week @ 2026-03-15 1229/week @ 2026-03-22 539/week @ 2026-03-29 116/week @ 2026-04-05 383/week @ 2026-04-12 188/week @ 2026-04-26 303/week @ 2026-05-03

874 downloads per month

Apache-2.0

535KB
11K SLoC

mahler

mahler is a library and engine for automatic orchestration of jobs into executable workflows.

Crates.io Documentation

More information about this crate can be found in the crate documentation.

Overview

Modern infrastructure requires intelligent automation that can adapt to changing conditions. Traditional static workflows become unmanageable as system complexity grows - you end up with brittle scripts that break when conditions change.

Mahler solves this by using automated planning to generate workflows dynamically. Instead of writing complex conditional logic, in Mahler, tasks are defined via simple Rust functions. The planner automatically discovers the right sequence of operations to reach a desired target state.

Core Features

  • Simple API - system state is defined as Rust structs with the help of the provided derive crate. Allowed tasks are defined as pure Rust functions acting on a part or the whole system state.
  • State engine with integrated planner - tasks are configured as jobs in a Worker domain. On a new target state, the worker will look for necessary changes to reach the target and look for a workflow that allows to reach the target from the current state.
  • Concurrent execution - the internal planner detects when tasks can run concurrently based on state paths
  • Automatic re-planning - re-computes workflow when runtime conditions change
  • Observable runtime - monitor the evolving state of the system from the Worker API. For more detailed logging, the library uses the tracing crate.
  • Easy to debug - worker observable state and known goals allow easy replicability when issues occur.

Basic Usage

In Mahler, a Task is an operation on a part of the system state that may chose to make changes given some target. It is defined as a pure Rust function. A Job is the configuration of the task to an operation on an operation on the system state. For instance a task may be "increment counter", but the job defines "use the 'increment counter' task for every counter that requires an update operation to reach the target".

This separation lets the planner compose your jobs into workflows that achieve complex state transitions. Jobs are evaluated during planning to determine applicability to a given target and later executed at runtime.

We'll create a system controller for a simple counting system. Let's define a task that operates on i32

use std::time::Duration;
use tokio::time::sleep;

use mahler::task::{IO, with_io, enforce};
use mahler::extract::{Target, View};

// `plus_one` defines a task that updates a counter if it is below some target.
// The task makes use of two extractors:
// - `View`, that provides a mutable view into the system state. By modifying the view,
// the task can affect the global state
// - `Target`, providing a read only view to the target being seeked by the planner
fn plus_one(mut counter: View<i32>, Target(tgt): Target<i32>) -> IO<i32> {
    // abort the task if the target has already been reached
    enforce!(*counter < target);

    // Modify the counter value if we are below the target
    *counter += 1;

    // This task is called at planning and at runtime, the `with_io` function
    // allows us to define what is returned by the function at each context.
    // The first argument of the function is what the planner receives,
    // the right side of the call is what will be executed at runtime if the
    // task is selected.
    with_io(counter, |counter| async {
        // The async call can be used to actually make changes to the underlying system.
        // It could be writing the counter to a database or a file. In this
        // case we just add some timer
        sleep(Duration::from_millis(10)).await;
        Ok(counter)
    })
}

The task above updates the counter if it is below the target, or it aborts otherwise, which tells the planner that a task pre-condition has not been met.

The function above defines an atomic task (also called an action), but we can also define compound tasks (also called methods), that allow to bias the planner to certain workflows depending on the conditions. Let's define a task to increase the counter by two.

use mahler::task::Handler;
use mahler::extract::{Target, View};

// `plus_two` is a compound task or method. A method does not modify the state directly
// but return combination of tasks to be executed to reach the given target.
fn plus_two(counter: View<i32>) -> Vec<Task> {
    // If the difference between the current state and target is >1
    if tgt - *counter > 1 {
        // Then return a sequence of two tasks. The target for the tasks is
        // automatically inherited from the method
        return vec![plus_one.into_task(), plus_one.into_task()];
    }

    // Otherwise do nothing
    vec![]
}

To use the tasks, we need to create a system model where the tasks will be applied. We'll use a Map.

use mahler::state::{Map, State};

// The `State` macro allows to models the system in a way that
// can evaluated by Mahler
#[derive(State, Debug, PartialEq, Eq)]
struct Counters(Map<String, i32>);

Finally in order to create and run workflows we need a Worker

use mahler::worker::Worker;
use mahler::job::update;

let mut worker = Worker::new()
    // try the task `plus_one` if an `update` operation on a counter is required
    // to reach the target state
    .job("/{counter}", update(plus_one))
    // try the task `plus_two` if an `update` operation on a counter is required
    // to reach the target state
    .job("/{counter}", update(plus_two))
    .initial_state(Counters(Map::from([
        ("a".to_string(), 0),
        ("b".to_string(), 0),
    ])))
    .unwrap();

// Tell the worker to look and execute a plan to a target state and return the updated state
let (state, status) = worker.seek_target(CountersTarget(Map::from([
    ("a".to_string(), 1),
    ("b".to_string(), 2),
])))
.await?;

println!("Final state: {:?}", state);

Note that plus_one and plus_two are configured as jobs to update operations to a counter, referenced by the route /{counter}. This means that, when receving a new target, the worker will compare the current state with the target and convert it to a set of operations: update, create, delete, on specific paths of the state, and consider plus_one and plus_two as candidates whenever a update operation to a counter is required.

Complete Example

Here's the full runnable example with logging and error handling:

use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

use mahler::state::{Map, State};
use mahler::worker::Worker;
use mahler::task::{Handler, IO, with_io};
use mahler::job::update;
use mahler::extract::Args;
use mahler::result::Result;

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize structured logging with tracing-subscriber
    // This provides human-readable logs for workflow execution monitoring
    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env())
        .with(
            fmt::layer()
                .with_span_events(FmtSpan::CLOSE)
                .event_format(fmt::format().compact().with_target(false)),
        )
        .init();

    let mut worker = Worker::new()
        // The  jobs are applicable to `update` operations
        // on individual counters
        .job(
            "/{counter}",
            update(plus_one)
                // we can add a description to the job for the logs
                .with_description(|Args(counter): Args<String>| format!("{counter}++")),
        )
        .job("/{counter}", update(plus_two))
        // We initialize the worker with two counters
        // `a` and `b` with value 0
        .initial_state(Counters(Map::from([
            ("a".to_string(), 0),
            ("b".to_string(), 0),
        ])))?;

    // Tell the worker to find a plan from the initial state (a:0, b:0)
    // to the target state (a:1, b:2) and execute it. If a plan is found and executed
    // successfully, it will return the updated system state
    let (state, status) = worker
        .seek_target(CountersTarget(Map::from([
            ("a".to_string(), 1),
            ("b".to_string(), 2),
        ])))
        .await?;

    assert_eq!(
        state,
        Counters(Map::from([("a".to_string(), 1), ("b".to_string(), 2),]))
    );

    println!("The system state is now {:?}", state);
    Ok(())
}

When receiving a call to seek_target, the worker looks for a plan to get the system to the given target state. The plan can be seen in the logs further down, but its representation is

+ ~ - a++
  ~ - b++
- b++

which is equivalent to the following graph

graph TD
    start(( ))
    fork(( ))
    start --> fork
    a0(a++)
    fork --> a0
    fork --> b0
    b0(b++)
    join(( ))
    a0 --> join
    b0 --> join
    b1(b++)
    join --> b1
    stop(( ))
    b1 --> stop
    start:::initial
    stop:::finish
    classDef initial stroke:#000,fill:#fff
    classDef finish stroke:#000,fill:#000

The full logs generated by the worker are below.

> RUST_LOG=debug cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.74s
     Running `target/debug/mahler-examples-readme`
2026-02-03T20:03:40.255342Z  INFO seek_target: searching workflow
2026-02-03T20:03:40.255479Z DEBUG seek_target: pending changes:
2026-02-03T20:03:40.255489Z DEBUG seek_target: - {"op": "update","path": /a,"value": 1}
2026-02-03T20:03:40.255497Z DEBUG seek_target: - {"op": "update","path": /b,"value": 2}
2026-02-03T20:03:40.255742Z  INFO seek_target: workflow found time=236.667µs
2026-02-03T20:03:40.255769Z DEBUG seek_target: will execute the following tasks:
2026-02-03T20:03:40.255787Z DEBUG seek_target: + ~ - a++
2026-02-03T20:03:40.255793Z DEBUG seek_target:   ~ - b++
2026-02-03T20:03:40.255799Z DEBUG seek_target: - b++
2026-02-03T20:03:40.255805Z  INFO seek_target: executing workflow
2026-02-03T20:03:40.255922Z  INFO seek_target:run_task: starting task=a++
2026-02-03T20:03:40.255960Z  INFO seek_target:run_task: starting task=b++
2026-02-03T20:03:40.269868Z  INFO seek_target:run_task: close time.busy=57.2µs time.idle=13.9ms task=a++
2026-02-03T20:03:40.269932Z  INFO seek_target:run_task: close time.busy=22.5µs time.idle=14.0ms task=b++
2026-02-03T20:03:40.270150Z  INFO seek_target:run_task: starting task=b++
2026-02-03T20:03:40.284168Z  INFO seek_target:run_task: close time.busy=110µs time.idle=13.9ms task=b++
2026-02-03T20:03:40.284648Z  INFO seek_target: workflow executed successfully time=28.840709ms
2026-02-03T20:03:40.284734Z  INFO seek_target: close time.busy=1.55ms time.idle=27.9ms
The system state is now Counters({"a": 1, "b": 2})

The source code for the example can be seen at examples/readme. A more advanced example can be seen in the example/composer directory.

Contributing

Thank you for your interest in contributing!

Issues

Feature requests and bug reports should be submitted via issues. For bug reports, including a reproduction will make the issue more likely to be prioritized and resolved. For feature requests, providing a description of the use case and the reasoning behind the feature request will encourage discussion and help prioritization.

Pull requests

Pull requests are the way concrete changes are made to the code in this repository. Here are a few guidelines to make the process easier.

  • Every PR should have an associated issue, and the PR's opening comment should say "Fixes #issue" or "Closes #issue".
  • We use Versionist to manage versioning (and in particular, semantic versioning) and generate the changelog for this project.
  • At least one commit in a PR should have a Change-Type: type footer, where type can be patch, minor or major. The subject of this commit will be added to the changelog.
  • Commits should be squashed as much as makes sense.

License

This project is licensed under the Apache License

Dependencies

~8–12MB
~138K SLoC