The WordPress workflow engine
Queuety is a WordPress plugin that provides a fast job queue, durable workflow engine, and durable state machine runtime. Workers claim jobs directly from MySQL via PDO and process them inside a long-running WP-CLI process.
The problem: WordPress has no real background job system. wp_cron only fires on page visits. Action Scheduler boots the entire WordPress stack for every batch. An LLM API call that takes 60 seconds gets killed by PHP's 30-second timeout. There's no way to run multi-step processes that survive crashes and resume where they left off.
Queuety solves this with three primitives:
- Jobs for fire-and-forget background work
- Workflows for durable multi-step processes with persistent state
- State machines for long-lived, event-driven lifecycle state
Prerequisites:
- PHP 8.2+
- WordPress 6.4+
- MySQL 5.7+ or MariaDB 10.3+
pdo_mysqlenabled for the PHP runtime that loads WordPress and WP-CLI
For Composer-managed WordPress installs (for example Bedrock):
composer require queuety/queuety
wp plugin activate queuetyFor a packaged plugin zip:
wp plugin install /path/to/queuety.zip --activateFor local development from this repository:
composer install
wp plugin activate queuetyIf pdo_mysql is missing, the plugin now stays loaded but inert and shows an admin notice instead of fataling during activation or bootstrap.
Out of the box, the plugin schedules a one-shot worker through WordPress cron every minute for the default queue. That gives you a basic no-shell fallback for jobs and workflows that stay on default, as long as WordPress cron is firing. For custom queues, lower latency, or production reliability, run dedicated wp queuety work processes.
Dispatch a job using the modern dispatch API:
use Queuety\Contracts\Job;
use Queuety\Dispatchable;
readonly class SendEmailJob implements Job {
use Dispatchable;
public function __construct(
public string $to,
public string $subject,
public string $body,
) {}
public function handle(): void {
wp_mail( $this->to, $this->subject, $this->body );
}
}
SendEmailJob::dispatch( 'user@example.com', 'Welcome', 'Hello from Queuety!' );Or use the classic handler name API:
use Queuety\Queuety;
Queuety::dispatch( 'send_email', [ 'to' => 'user@example.com' ] );Run a durable workflow with timers and signals:
$workflow_id = Queuety::workflow( 'approval_flow' )
->then( SubmitRequestHandler::class )
->sleep( hours: 24 )
->await_approval()
->then( ProcessApprovalHandler::class )
->on_cancel( CleanupHandler::class )
->dispatch( [ 'request_id' => 99 ] );
// Later, from another process:
Queuety::signal( $workflow_id, 'approval', [ 'approved_by' => 'admin@example.com' ] );Build runtime-discovered branch work with compensation:
use Queuety\Enums\JoinMode;
Queuety::workflow( 'agent_run' )
->then( PlanTasksStep::class )
->fan_out(
items_key: 'tasks',
handler_class: ExecuteTaskStep::class,
result_key: 'task_results',
join_mode: JoinMode::Quorum,
quorum: 2,
reducer_class: SummarizeTaskResults::class,
)
->compensate_with( ReleaseTaskLocks::class )
->compensate_on_failure()
->then( FinalizeRunStep::class )
->dispatch( [ 'run_id' => 99 ] );Coordinate one workflow with another:
use Queuety\Enums\WaitMode;
$research_id = Queuety::workflow( 'research_run' )
->then( PlanResearchStep::class )
->fan_out( 'tasks', ExecuteResearchTask::class, 'results' )
->dispatch( [ 'brief_id' => 42 ] );
Queuety::workflow( 'editorial_run' )
->await_workflow( $research_id, 'research' )
->then( DraftBriefStep::class )
->dispatch();Repeat an earlier step until review state says the run can continue:
Queuety::workflow( 'brief_review_loop' )
->max_transitions( 10 )
->then( DraftBriefStep::class, 'draft' )
->await_decision( result_key: 'review' )
->repeat_until(
target_step: 'draft',
condition_class: ReviewApprovedCondition::class,
max_iterations: 5,
)
->then( PublishBriefStep::class )
->dispatch( [ 'brief_id' => 42 ] );Model an event-driven agent session with explicit lifecycle state:
use Queuety\Enums\StateMachineStatus;
$machine_id = Queuety::machine( 'agent_session' )
->state( 'awaiting_user' )
->on( 'user_message', 'planning' )
->state( 'planning' )
->action( PlanSessionAction::class )
->on( 'planned', 'awaiting_review' )
->state( 'awaiting_review' )
->on( 'approve', 'completed', ReviewApprovedGuard::class )
->state( 'completed', StateMachineStatus::Completed )
->dispatch( [ 'thread_id' => 42 ] );
Queuety::machine_event(
$machine_id,
'user_message',
[ 'message' => 'Find competitors for this product' ]
);Trigger workflows directly from WordPress actions:
Queuety::on_action(
'save_post',
workflow: 'content_review',
map: static fn ( int $post_id, object $post, bool $update ): array => [
'post_id' => $post_id,
'post_type' => $post->post_type,
'update' => $update,
],
when: static fn ( int $post_id, object $post ): bool => 'post' === $post->post_type,
idempotency_key: static fn ( int $post_id ): string => "save_post:{$post_id}",
);Hand work off to independent child workflows and join later:
use Queuety\Enums\WaitMode;
$agent_task = Queuety::workflow( 'agent_task' )
->then( ResearchTopicStep::class )
->then( SummarizeTopicStep::class );
Queuety::workflow( 'brief_research' )
->then( PlanTopicsStep::class )
->spawn_workflows( 'topics', $agent_task, 'child_workflow_ids' )
->await_workflows( 'child_workflow_ids', WaitMode::All, 'child_results' )
->then( SynthesizeBriefStep::class )
->dispatch( [ 'brief_id' => 42 ] );For a fuller planner/executor walkthrough with human review and cross-workflow waits, see the Agent Orchestration docs page.
For WordPress-specific workflow triggers, see the Action Triggers docs page.
Add workflow guardrails for agent runs:
Queuety::workflow( 'agent_research' )
->version( 'research.v2' )
->idempotency_key( 'brief:42:research' )
->max_transitions( 20 )
->max_fan_out_items( 12 )
->max_state_bytes( 32768 )
->then( PlanResearchStep::class )
->fan_out( 'tasks', ExecuteResearchTask::class, 'results' )
->dispatch( [ 'brief_id' => 42 ] );Dispatch a batch with callbacks:
$batch = Queuety::create_batch( [
new ImportUsersJob( $chunk_1 ),
new ImportUsersJob( $chunk_2 ),
new ImportUsersJob( $chunk_3 ),
] )
->name( 'Import users' )
->then( ImportCompleteHandler::class )
->catch( ImportFailedHandler::class )
->finally( ImportCleanupHandler::class )
->allow_failures()
->on_queue( 'imports' )
->dispatch();Add middleware to a job:
use Queuety\Middleware\RateLimited;
use Queuety\Middleware\ThrottlesExceptions;
readonly class CallExternalApiJob implements Job {
use Dispatchable;
public function __construct( public int $record_id ) {}
public function handle(): void {
// Call external API...
}
public function middleware(): array {
return [
new RateLimited( max: 60, window: 60 ),
new ThrottlesExceptions( max_attempts: 3, decay_minutes: 5 ),
];
}
}Start a worker:
wp queuety workScale a worker pool between a warm minimum and a backlog-driven maximum:
wp queuety work --queue=providers --min-workers=2 --max-workers=6- Fast execution -- workers use direct MySQL queue access and avoid per-job cron/bootstrap overhead
- Durable workflows -- multi-step processes with persistent state that survive PHP timeouts, crashes, and retries
- Dispatchable jobs -- self-contained readonly job classes with the
Dispatchabletrait andContracts\Jobinterface - Middleware pipeline -- onion-style middleware for rate limiting, throttling, uniqueness, and custom logic
- Batching -- dispatch groups of jobs with
then,catch, andfinallycallbacks plus progress tracking - Job chaining -- sequential job execution where each job depends on the previous one completing
- Durable timers --
sleep()steps that survive process restarts and resume at the right time - Signals and human gates --
wait_for_signal(),wait_for_signals(),await_approval(), andawait_input()pause workflows until external input arrives - Workflow dependencies --
await_workflow()andawait_workflows()coordinate top-level workflows without forcing them into one workflow definition - Async workflow handoffs --
spawn_workflows()turns runtime-discovered items into independent top-level workflows that can be awaited later, including named spawned groups - Dynamic fan-out --
fan_out()expands runtime-discovered work withAll,FirstSuccess, andQuorumjoin modes - Durable loops --
repeat_until()andrepeat_while()revisit earlier named steps without hiding the back-edge inside arbitrary step code - Durable artifacts -- store named workflow outputs outside the main state bag and inspect them later through status, CLI, export, and replay
- Workflow guardrails --
version(), a deterministic definition hash,idempotency_key(),max_transitions(),max_fan_out_items(),max_state_bytes(),max_cost_units(), andmax_spawned_workflows()keep long-running runs inside a defined envelope - Resource-aware admission -- concurrency groups, weighted queue and provider budgets, observed memory/time headroom, and optional container-awareness let workers defer expensive work instead of oversubscribing a process
- Step compensation --
compensate_with()andcompensate_on_failure()provide saga-style rollback hooks for completed steps - Streaming steps --
StreamingStepinterface withChunkStorefor persisting streamed data chunk by chunk - Cache layer -- pluggable cache with
MemoryCacheandApcuCachebackends, auto-detected viaCacheFactory - Heartbeats -- long-running steps send heartbeats to prevent premature stale-job recovery
- Workflow cancellation -- cancel running workflows and trigger registered cleanup handlers
- Workflow event log -- full timeline of step transitions with state snapshots and time-travel debugging
- State pruning -- automatic removal of old step outputs to keep workflow state lean
- Schedule overlap policies -- Allow, Skip, or Buffer for recurring jobs
- Multi-queue worker priorities -- process multiple queues with strict priority ordering
- Parallel steps -- run steps concurrently and wait for all to complete before advancing
- Conditional branching -- skip to named steps based on prior state or use first-class loop steps for explicit back-edges
- Sub-workflows -- spawn child workflows that feed results back to the parent
- Priority queues -- 4 levels (Low, Normal, High, Urgent) via type-safe enums
- Rate limiting -- per-handler execution limits with sliding window
- Recurring jobs -- interval-based (
every('1 hour')) and cron-based (cron('0 3 * * *')) scheduling - Job dependencies -- job B waits for job A to complete before running
- Unique jobs -- prevent duplicate dispatches for the same handler and payload
- Job properties --
$tries,$timeout,$backoff,$concurrency_group,$concurrency_limit, and$cost_unitsdeclared directly on job classes failed()hook -- called on the job instance when all retries are exhausted- Conditional dispatch --
dispatch_if()anddispatch_unless()on theDispatchabletrait - Synchronous dispatch --
dispatch_sync()runs a job immediately without the queue - Timeout enforcement -- kill jobs that exceed max execution time
- Worker concurrency --
--workers=Nforks multiple processes with automatic restart on crash - Permanent logging -- queryable database log of every job and workflow execution
- Metrics API -- throughput, latency percentiles, and error rates per handler
- Webhooks -- HTTP notifications on job/workflow events
- Testing utilities --
QueueFakefor asserting dispatched jobs andcreate_batch()batches in tests - PHP attributes --
#[QueuetyHandler('name')]for auto-registration - ThrottlesExceptions -- back off when external services are down to prevent job storms
- Debug mode -- verbose worker logging for development
Workers run inside a long-lived WP-CLI process and claim jobs directly from MySQL via PDO. The queue, workflow state, logs, batches, signals, and streaming chunks all live in MySQL, so worker restarts do not lose orchestration state.
Workflows break long-running work into steps. Each step persists its output to a shared state bag in the database. If PHP dies mid-step, the worker retries that step with all prior state intact. The step boundary is a single MySQL transaction: state update, job completion, and next step enqueue all happen atomically.
Workflow: generate_report (3 steps)
Step 0: fetch_data -> state: {user_data: {...}}
Step 1: call_llm -> PHP dies -> retry -> state: {user_data: {...}, llm_response: "..."}
Step 2: format_output -> state: {user_data: {...}, llm_response: "...", report_url: "/reports/42.pdf"}
Workflow: completed
Classic handler interface:
class SendEmailHandler implements Queuety\Handler {
public function handle(array $payload): void {
wp_mail($payload['to'], $payload['subject'], $payload['body']);
}
public function config(): array {
return [
'queue' => 'emails',
'max_attempts' => 5,
];
}
}
Queuety::register('send_email', SendEmailHandler::class);Modern dispatchable job class:
use Queuety\Contracts\Job;
use Queuety\Dispatchable;
readonly class SendEmailJob implements Job {
use Dispatchable;
public int $tries = 5;
public int $timeout = 30;
public array $backoff = [10, 30, 60];
public function __construct(
public string $to,
public string $subject,
public string $body,
) {}
public function handle(): void {
wp_mail($this->to, $this->subject, $this->body);
}
public function failed(\Throwable $e): void {
error_log("Email to {$this->to} failed: {$e->getMessage()}");
}
}class CallLLMHandler implements Queuety\Step {
public function handle(array $state): array {
$response = $this->callAPI($state['prompt']);
return ['llm_response' => $response]; // merged into state
}
public function config(): array {
return ['max_attempts' => 5];
}
}For LLM responses or large downloads, streaming steps persist data chunk by chunk so progress survives crashes:
use Queuety\Contracts\StreamingStep;
class StreamLLMHandler implements StreamingStep {
public function stream(array $state, array $existing_chunks = []): \Generator {
$offset = count($existing_chunks);
foreach ($this->streamApi($state['prompt'], $offset) as $chunk) {
yield $chunk; // persisted to DB immediately
}
}
public function on_complete(array $chunks, array $state): array {
return ['response' => implode('', $chunks)];
}
public function config(): array {
return ['max_attempts' => 3];
}
}| Command | Description |
|---|---|
wp queuety work [--queue=<q>] [--once] [--workers=<n>] |
Start one worker or a fixed worker pool |
wp queuety work [--queue=<q>] [--min-workers=<n>] --max-workers=<n> |
Start an adaptive worker pool |
wp queuety work --queue=high,default,low |
Process multiple queues with priority ordering |
wp queuety flush |
Process all pending jobs and exit |
wp queuety dispatch <handler> --payload='{}' |
Dispatch a job |
wp queuety status |
Show queue stats |
wp queuety list [--queue=<q>] [--status=<s>] |
List jobs |
wp queuety inspect <id> |
Show full job details and log history |
wp queuety retry <id> |
Retry a job |
wp queuety retry-buried |
Retry all buried jobs |
wp queuety bury <id> |
Bury a job |
wp queuety delete <id> |
Delete a job |
wp queuety recover |
Recover stale jobs |
wp queuety purge [--older-than=<days>] |
Purge completed jobs |
wp queuety pause <queue> |
Pause a queue |
wp queuety resume <queue> |
Resume a queue |
wp queuety metrics |
Show per-handler metrics |
wp queuety discover <dir> --namespace=<ns> |
Auto-discover handlers |
wp queuety workflow status <id> |
Show workflow progress |
wp queuety workflow retry <id> |
Retry from failed step |
wp queuety workflow pause <id> |
Pause a workflow |
wp queuety workflow resume <id> |
Resume a workflow |
wp queuety workflow list [--status=<s>] |
List workflows |
wp queuety workflow cancel <id> |
Cancel a workflow and run cleanup handlers |
wp queuety workflow timeline <id> |
Show the full event timeline for a workflow |
wp queuety workflow state-at <id> <step> |
Show workflow state snapshot at a specific step |
wp queuety schedule list |
List recurring schedules |
wp queuety schedule add <handler> [--every=<i>] [--cron=<c>] |
Add a recurring schedule |
wp queuety schedule remove <handler> |
Remove a schedule |
wp queuety schedule run |
Manually trigger scheduler tick |
wp queuety log [--workflow=<id>] [--job=<id>] |
Query log entries |
wp queuety log purge --older-than=<days> |
Prune old logs |
wp queuety webhook add <event> <url> |
Register a webhook |
wp queuety webhook list |
List webhooks |
wp queuety webhook remove <id> |
Remove a webhook |
All constants are optional. Define them in wp-config.php or before Queuety boots in an embedded theme-style install.
| Constant | Default | Description |
|---|---|---|
QUEUETY_RETENTION_DAYS |
7 |
Auto-purge completed jobs after N days |
QUEUETY_LOG_RETENTION_DAYS |
0 |
Auto-purge logs after N days (0 = forever) |
QUEUETY_MAX_EXECUTION_TIME |
300 |
Max seconds per job before timeout |
QUEUETY_WORKER_SLEEP |
1 |
Seconds to sleep when queue is empty |
QUEUETY_WORKER_MAX_JOBS |
1000 |
Max jobs before worker restarts |
QUEUETY_WORKER_MAX_MEMORY |
128 |
Max MB before worker restarts |
QUEUETY_RETRY_BACKOFF |
exponential |
Backoff strategy (exponential, linear, fixed) |
QUEUETY_STALE_TIMEOUT |
600 |
Seconds before stuck jobs are recovered |
QUEUETY_CACHE_TTL |
5 |
Default cache TTL in seconds |
QUEUETY_DEBUG |
false |
Enable verbose worker logging |
QUEUETY_CLI_COMMAND |
queuety |
Root WP-CLI command name |
QUEUETY_TABLE_PREFIX |
queuety_ |
Shared base name for all Queuety tables after the WordPress DB prefix |
QUEUETY_TABLE_JOBS |
queuety_jobs |
Jobs table name |
QUEUETY_TABLE_WORKFLOWS |
queuety_workflows |
Workflows table name |
QUEUETY_TABLE_LOGS |
queuety_logs |
Logs table name |
QUEUETY_TABLE_SCHEDULES |
queuety_schedules |
Schedules table name |
QUEUETY_TABLE_SIGNALS |
queuety_signals |
Signals table name |
QUEUETY_TABLE_WORKFLOW_DEPENDENCIES |
queuety_workflow_dependencies |
Workflow dependency waits table name |
QUEUETY_TABLE_WORKFLOW_DISPATCH_KEYS |
queuety_workflow_dispatch_keys |
Durable workflow idempotency table name |
QUEUETY_TABLE_CHUNKS |
queuety_chunks |
Streaming chunks table name |
QUEUETY_TABLE_QUEUE_STATES |
queuety_queue_states |
Queue states table name |
QUEUETY_TABLE_WEBHOOKS |
queuety_webhooks |
Webhooks table name |
QUEUETY_TABLE_PREFIX changes the shared Queuety portion of the table names while leaving the WordPress database prefix in place. For example, with $wpdb->prefix = 'wp_' and QUEUETY_TABLE_PREFIX = 'themequeue_', the jobs table becomes wp_themequeue_jobs. Explicit QUEUETY_TABLE_* constants still override individual tables.
# Install dependencies
composer install
npm install
# Run the full local validation suite
bash tests/run-all.sh
# Run individual categories when needed
composer cs
composer stan
composer test:unit
composer test:integration
bash tests/e2e/run-all.sh
npm --prefix docs run build
# Run wp-env E2E directly when you only want the WordPress runtime pass
npm run test:e2e:wp-env
# Auto-fix coding standards
composer cs:fixThe local runner mirrors the CI categories and keeps Queuety's extra integration layer separate instead of flattening it into the unit suite.
GPL-2.0-or-later