A modular framework for training, and running inference with context-aware diffusion models on trajectory data. It provides a registry-driven component system, YAML-based configuration, and built-in support for HuggingFace Accelerate (multi-GPU), CUDA graphs, Weights & Biases logging, and experiment sweeps.
This framework accompanies the paper:
Accelerated Multi-Modal Motion Planning Using Context-Conditioned Diffusion Models Edward Sandra, Lander Vanroye, Dries Dirckx, Ruben Cartuyvels, Jan Swevers, Wilm Decré arXiv:2510.14615 — https://arxiv.org/abs/2510.14615
If you use this framework in your research, please cite:
@misc{sandra2025campd,
title = {Accelerated Multi-Modal Motion Planning Using Context-Conditioned Diffusion Models},
author = {Sandra, Edward and Vanroye, Lander and Dirckx, Dries and Cartuyvels, Ruben and Swevers, Jan and Decr\'{e}, Wilm},
year = {2025},
eprint = {2510.14615},
archivePrefix = {arXiv},
primaryClass = {cs.RO},
url = {https://arxiv.org/abs/2510.14615},
}Full API documentation for the project's codebase, including all registries, architectures, and experiments, is available at the CAMPD API Docs.
- Overview
- Installation
- Training Data
- Core Concepts
- Launching Experiments
- YAML Configuration Reference
- Extending the Framework
- Built-in Components
- Troubleshooting
CAMPD is built around a diffusion-model pipeline for trajectory generation conditioned on (but not limited to) environment context (e.g. obstacle geometries). The high-level flow is:
- Data — Load trajectory datasets from HDF5 files (with context fields like cuboid/cylinder/sphere obstacle descriptions).
- Model — A
ContextTrajectoryDiffusionModelwrapping HuggingFace Diffusers schedulers, a reverse-diffusion denoising network (e.g.TemporalUnet), and an optional context encoder. - Training — A
Trainerruns the training loop with configurable objectives, callbacks, summaries, multi-objective optimization (TorchJD), AMP, gradient clipping, and optional CUDA graph acceleration. - Inference — Load a trained checkpoint and sample trajectories, optionally validating them with domain-specific validators.
Everything is wired together through YAML config files and a registry system, so you can swap components without changing code.
- Python ≥ 3.10
- CUDA-capable GPU (recommended)
pip install campdThis installs the campd package and the campd-run CLI entry point.
Note: Some example projects (e.g.
examples/franka_curobo/) may have additional dependencies not listed inpyproject.toml(e.g.curobo,pinocchio). These are installed separately; check each example'srequirements.txt.
Note: If you want to enable Weights and Biases logging, install it with
pip install wandband runwandb loginto authenticate.
Training datasets for the sphere-based and MPINets environments are stored as Git LFS objects in a separate repository. Download and extract them before running the example experiments:
GIT_LFS_SKIP_SMUDGE=1 git clone https://github.com/meco-group/campd-data.git /tmp/campd-data
(cd /tmp/campd-data && git lfs pull)
mkdir -p data/train/
tar -xzf /tmp/campd-data/train_data_campd_franka_spheres.tar.gz -C data/train/
tar -xzf /tmp/campd-data/train_data_campd_mpinets.tar.gz -C data/train/
rm -rf /tmp/campd-dataThe framework uses a registry pattern to enable config-driven component selection. Each category of component has its own Registry instance:
| Registry | Module | Purpose |
|---|---|---|
EXPERIMENTS |
experiments/registry.py |
Experiment types |
MODULES |
architectures/registry.py |
Generic nn.Module building blocks |
REVERSE_NETS |
architectures/registry.py |
Denoising networks |
CONTEXT_NETS |
architectures/registry.py |
Context encoder networks |
LOSSES |
training/registry.py |
Loss functions |
CALLBACKS |
training/registry.py |
Training callbacks |
SUMMARIES |
training/registry.py |
Training summaries |
OBJECTIVES |
training/registry.py |
Training objectives |
VALIDATORS |
experiments/validators.py |
Inference validators |
Components self-register using a decorator:
from campd.training.registry import CALLBACKS
@CALLBACKS.register("MyCallback")
class MyCallback(Callback):
...Then in the YAML config:
callbacks:
- cls: "MyCallback"Critical: For a component to be available at runtime, its module must be imported before the registry lookup happens. This is handled by two mechanisms:
campd/all_imports.py— Bulk-imports all built-in subpackages (architectures, data, experiments, models, training), which triggers their__init__.pychains and populates the registries with built-in components.- The
dependencieskey in YAML config — Imports external/example-specific modules at startup (see Dependencies / Imports in YAML).
Spec (defined in utils/registry.py) is a Pydantic model that describes how to build an object from config. It supports two modes:
optimizer:
cls: "torch.optim.Adam" # Full import path or registry key
init:
lr: 1.0e-4
weight_decay: 0.0This calls torch.optim.Adam(lr=1e-4, weight_decay=0.0).
objective:
cls: "DiffusionObjective" # Registry key
config:
loss_fn:
cls: "torch.nn.MSELoss"
init:
reduction: "mean"This calls DiffusionObjective.from_config(config_dict). The class must have a from_config classmethod.
- If a
registryfield is set on theSpec, theclsstring is looked up in that specific registry. - Otherwise, the
clsstring is first tried as a registry key (if a registry is passed tobuild_from), then as a Python import path (e.g.torch.optim.Adam). - This means you can reference any importable class by its full dotted path, or use short registry keys for registered components.
The framework uses Pydantic models for configuration validation. A key feature is attribute propagation: parent-level config values are automatically pushed down to nested child configs that share the same field name. For example:
experiment:
device: "cuda:0" # Parent-level
dataset:
# device is NOT declared here, but if TrajectoryDatasetCfg has a
# 'device' field, it will receive "cuda:0" from the parent.
...
trainer:
tensor_args:
device: "cuda:0" # Explicit — but could also be propagatedYAML anchors (&name / *name) can be used in config files for DRY configuration.
The dependencies top-level key in YAML configs lists modules or directories that should be imported before the experiment runs. This is essential for registering custom components (e.g. example-specific summaries, validators, architectures):
dependencies:
- "../src" # A directory — all .py files inside are imported
- "my_custom_module" # A Python module import path
- "./my_file.py" # A single Python filePaths are resolved relative to the config file's directory.
This is the mechanism that makes custom components available to the registries. If you define a custom @SUMMARIES.register("ValidationSummary") class in examples/franka_curobo/src/training_summary.py, listing "../src" in dependencies ensures it's imported and registered before the config tries to reference "ValidationSummary".
The campd-run CLI is installed as a console script by pip:
campd-run path/to/config.yamlThis:
- Parses the YAML file.
- Extracts
dependencies,experiment,wandb,launcher, andsweepsections. - Imports built-in and user-defined dependencies to populate registries.
- Uses experiment-launcher to manage experiment execution (seeding, output directories, optional SLURM submission).
- Looks up the experiment class via
experiment.clsin theEXPERIMENTSregistry and calls itsrun()method.
Launcher configuration controls experiment management:
launcher:
exp_name: "my_experiment"
n_seeds: 1 # Number of seeds (repetitions)
start_seed: 0
base_dir: "results/" # Output base directory
use_timestamp: true # Append timestamp to output dir
resources:
n_exps_in_parallel: 1 # Parallel experiments
... (see experiment-launcher docs)For simpler use cases or debugging, you can bypass the launcher:
import os
from campd.experiments import TrainExperiment
base_dir = os.path.dirname(os.path.abspath(__file__))
exp = TrainExperiment.from_yaml(os.path.join(base_dir, "configs/train.yaml"))
exp.run()Note: When using
from_yaml, thedependenciessection is not processed automatically. You must import your custom modules manually before callingfrom_yaml(e.g.import my_custom_module).The
campd-runCLI handles this for you.
A full config file has up to five top-level sections:
# 1. Dependencies — modules/directories to import for custom registrations
dependencies:
- "../src"
# 2. WandB — Weights & Biases logging
wandb:
mode: "online" # "online", "offline", or "disabled"
entity: "my-team"
project: "my-project"
group: "group_name"
name: &name "run_name"
# 3. Launcher — experiment-launcher settings
launcher:
exp_name: *name
base_dir: "results/"
n_seeds: 1
# ... (see experiment-launcher docs)
# 4. Sweep — hyperparameter sweep (optional)
sweep:
trainer:
lr: [1e-4, 1e-3] # Creates one run per value
# 5. Experiment — the actual experiment configuration
experiment:
cls: "train" # Registered experiment key
# Common fields (from ExperimentCfg):
seed: 42
device: "cuda:0"
# results_dir: set by launcher
# Experiment-specific fields (e.g. TrainExperimentCfg):
dataset_dir: "data/train/my_dataset"
train_file: "train.hdf5"
val_file: "val.hdf5" # Optional
# val_set_size: 0.1 # Optional
dataset:
trajectory_state: "pos" # "pos", "pos+vel", "pos+vel+acc"
field_config:
trajectory_field: "solutions" # HDF5 key for trajectory data
q_dim: 7 # Configuration-space dimension
context_fields: # Maps list of HDF5 keys -> context key
cuboids: ["cuboid_centers", "cuboid_dims", "cuboid_quaternions"]
# Note that the subkeys are still accessible inside the TrajectoryContext
# object
# also possible to use a list of HDF5 keys
# context_fields:
# - "cuboid_centers"
# - "cuboid_dims"
# - "cuboid_quaternions"
# ...
model:
state_dim: 7
model_type: "epsilon" # "epsilon", "sample", or "v_prediction"
n_diffusion_steps: 25
network: # Spec for reverse diffusion network
cls: "TemporalUnet"
config: { ... }
context_network: # Spec for context encoder (optional)
cls: "campd.architectures.context.encoder.ContextEncoder"
config: { ... }
trainer:
max_epochs: 200
optimizer:
cls: "torch.optim.Adam"
init: { lr: 1e-4 }
objective:
cls: "DiffusionObjective"
config:
loss_fn:
cls: "torch.nn.MSELoss"
init: { reduction: "mean" }
callbacks:
- cls: "PrinterCallback"
- cls: "EMACallback"
init: { decay: 0.995 }
- cls: "CheckpointCallback"
init: { save_best: true }
- cls: "WandBCallback"
summaries:
- cls: "ValidationSummary" # Custom (from dependencies)
init: { every_n_steps: 2500 }The general pattern for adding a new component:
- Create a Python file with your class, inheriting from the appropriate base class.
- Decorate it with
@REGISTRY.register("key")using the relevant registry. - Make sure it's imported at startup — either by placing it in a built-in subpackage (and re-exporting via
__init__.py), or by listing its module/directory in thedependenciessection of your YAML config. - Reference it in your YAML config via the registry key.
# my_experiments/custom_exp.py
from campd.experiments.base import BaseExperiment, ExperimentCfg
from campd.experiments.registry import EXPERIMENTS
from pydantic import validate_call
class MyExperimentCfg(ExperimentCfg):
my_param: str = "default"
@EXPERIMENTS.register("my_experiment")
class MyExperiment(BaseExperiment):
CfgClass = MyExperimentCfg
@validate_call
def __init__(self, cfg: MyExperimentCfg):
super().__init__(cfg)
def run(self):
print(f"Running with param: {self.cfg.my_param}")dependencies:
- "my_experiments" # Directory containing custom_exp.py
experiment:
cls: "my_experiment"
my_param: "hello"# my_networks/custom_net.py
import torch.nn as nn
from campd.architectures.registry import REVERSE_NETS
from campd.utils.registry import FromCfg
@REVERSE_NETS.register("MyDenoiser")
class MyDenoiser(nn.Module):
def __init__(self, state_dim: int, hidden_dim: int):
super().__init__()
# ... build layers ...
@classmethod
def from_config(cls, cfg):
if isinstance(cfg, dict):
return cls(**cfg)
return cls(**cfg.model_dump())
def forward(self, x, t, context=None):
# x: [B, T, state_dim], t: [B], context: EmbeddedContext or None
...model:
network:
cls: "MyDenoiser"
config:
state_dim: 7
hidden_dim: 128from campd.training.callbacks import Callback
from campd.training.registry import CALLBACKS
@CALLBACKS.register("LRLoggerCallback")
class LRLoggerCallback(Callback):
def on_epoch_end(self, trainer, train_losses=None):
lr = trainer.optimizer.param_groups[0]['lr']
print(f"Current LR: {lr}")Available hooks: on_train_start, on_fit_start, on_train_end, on_epoch_start, on_epoch_end, on_batch_start, on_batch_end, on_validation_start, on_validation_end, on_summary_end.
from campd.training.summary import Summary
from campd.training.registry import SUMMARIES
@SUMMARIES.register("MySummary")
class MySummary(Summary):
def __init__(self, every_n_steps=1000):
super().__init__(every_n_steps=every_n_steps)
def _run(self, model, train_dataloader, val_dataloader, step):
# Generate samples, compute metrics, return dict/figures
return {"my_metric": 0.95}from campd.training.objectives.base import TrainingObjective
from campd.training.registry import OBJECTIVES
@OBJECTIVES.register("MyObjective")
class MyObjective(TrainingObjective):
@classmethod
def from_config(cls, cfg):
return cls(cfg)
def step(self, model, batch):
# Return: (losses_dict, model_output, info_dict)
loss = ...
return {"my_loss": loss}, model_out, {}from campd.experiments.validators import Validator, VALIDATORS
@VALIDATORS.register("MyValidator")
class MyValidator(Validator):
def validate(self, batch, output_dir):
# Return dict of validation metrics
return {"success_rate": 0.85}| Key | Class | Description |
|---|---|---|
"train" |
TrainExperiment |
Full training pipeline (data → model → fit) |
"inference" |
InferenceExperiment |
Load checkpoint & sample trajectories |
| Key | Description |
|---|---|
"PrinterCallback" |
Logs training start/end messages |
"EMACallback" |
Exponential moving average of model weights |
"CheckpointCallback" |
Saves checkpoints (best, last, periodic) |
"WandBCallback" |
Logs metrics/artifacts to Weights & Biases |
"EarlyStoppingCallback" |
Stops training when validation loss plateaus |
| Key | Description |
|---|---|
"DiffusionObjective" |
Standard diffusion loss (ε, sample, or v) |
| Key | Description |
|---|---|
"WeightedL1" |
Weighted L1 loss |
"WeightedL2" |
Weighted L2 loss |
"MSE" |
nn.MSELoss |
"L1" |
nn.L1Loss |
-
CUDA graph errors: If
use_cuda_graph: trueand you get runtime errors, ensure your PyTorch CUDA version matches the system CUDA version. Also verify that all tensor shapes remain constant across batches (CUDA graphs require fixed shapes). -
KeyError: Unknown 'X' in registry 'Y': The componentXis not registered. Ensure:- The module defining the component is imported before the registry lookup.
- The module is listed in the
dependenciessection of your config. - The
@REGISTRY.register("X")decorator is present on the class.
If your issue isn't covered above, please open a GitHub issue with a minimal reproducible example and the full error traceback.
See LICENSE for details.