一个用 Rust 编写的高性能、可配置的任务调度系统。
- 🎯 YAML 配置驱动 - 通过简单的 YAML 文件定义任务
- ⏰ 灵活的调度 - 支持 cron 定时任务和立即执行任务
- 🔄 热重载 - 配置文件变化时自动重新加载
- 🧩 模块化设计 - 任务可以调用其他模块,支持参数传递
- 🛠️ 丰富的内置函数 - 字符串操作、数学运算、文件操作等
- 📝 用户定义函数 - 在配置文件中定义自定义函数
- 🔀 条件执行 - 支持基于条件的动作执行
- 📁 文件操作 - 创建、读取、写入、删除文件
- 🔁 循环控制 - for_each 循环和流程控制(break, continue, return)
- ⚙️ 全局配置 - 集中管理数据库 DSN、Redis 连接等全局配置,避免重复定义
dsl-scheduler/
├── config/ # 配置文件目录
│ ├── examples/ # 完整示例配置
│ │ ├── comprehensive_features.yaml # 综合功能演示
│ │ ├── global_config_usage.yaml # 全局配置使用演示
│ │ └── production_tasks.yaml # 生产环境任务示例
│ ├── templates/ # 配置模板
│ │ ├── simple_task.yaml # 简单任务模板
│ │ ├── task_with_module.yaml # 带模块调用的任务模板
│ │ └── control_flow.yaml # 控制流程模板
│ └── README.md # 配置说明文档
├── examples/ # Rust 代码示例
│ ├── database_comprehensive_demo.rs # 数据库功能完整演示
│ ├── full_scheduler_demo.rs # 完整调度器演示
│ ├── global_config_demo.rs # 全局配置功能演示
│ ├── variable_lifecycle_demo.rs # 变量生命周期演示
│ └── README.md # 示例说明文档
├── tests/ # 测试文件
│ ├── comprehensive_integration_tests.rs # 综合集成测试
│ └── integration_tests.rs # 基础集成测试
├── src/ # 源代码
│ ├── core/ # 核心功能模块
│ │ ├── actions/ # 动作执行器
│ │ ├── config.rs # 配置解析(含单元测试)
│ │ ├── context.rs # 执行上下文(含单元测试)
│ │ ├── executor.rs # 调度执行器
│ │ ├── scheduler.rs # 任务调度器
│ │ └── file_watcher.rs # 文件监控
│ ├── api.rs # API 接口
│ ├── lib.rs # 库入口
│ └── main.rs # 程序入口
├── QUICKSTART.md # 快速开始指南
└── README.md # 项目说明文档
graph TB
%% 用户层
User[👤 用户] --> CLI[🖥️ CLI / main.rs]
User --> API[🔌 API接口]
%% 配置层
CLI --> ConfigFiles[📄 YAML配置文件]
ConfigFiles --> |热重载| FileWatcher[👁️ 文件监控器]
%% 核心层
CLI --> Scheduler[⚙️ 调度器管理器]
API --> Scheduler
FileWatcher --> Scheduler
Scheduler --> ConfigParser[📋 配置解析器]
Scheduler --> TaskExecutor[🚀 任务执行器]
%% 执行层
TaskExecutor --> Context[📦 执行上下文]
TaskExecutor --> ActionSystem[🎯 动作系统]
%% 动作系统详细分解
ActionSystem --> FlowActions[🔄 流程控制]
ActionSystem --> FileActions[📁 文件操作]
ActionSystem --> DatabaseActions[🗄️ 数据库操作]
ActionSystem --> HttpActions[🌐 HTTP请求]
ActionSystem --> FunctionActions[⚡ 函数调用]
%% 配置与上下文
ConfigParser --> GlobalConfig[🌍 全局配置]
GlobalConfig --> Context
Context --> Variables[📊 变量管理]
Context --> Templates[📝 模板渲染]
%% 样式定义
classDef userLayer fill:#e1f5fe
classDef configLayer fill:#f3e5f5
classDef coreLayer fill:#e8f5e8
classDef actionLayer fill:#fff3e0
class User,CLI,API userLayer
class ConfigFiles,FileWatcher,ConfigParser configLayer
class Scheduler,TaskExecutor,Context coreLayer
class ActionSystem,FlowActions,FileActions,DatabaseActions,HttpActions,FunctionActions actionLayer
classDiagram
%% API层
class ConfigParser {
+from_yaml(content, path) Config
}
class DSLScheduler {
+new() Self
+start() Result
+stop() Result
+reload_config() Result
}
%% 核心调度层
class SchedulerManager {
-tasks: HashMap~String, Task~
-file_watcher: FileWatcher
+load_config(path) Result
+start_scheduler() Result
+stop_all_tasks() Result
}
class FileWatcher {
-paths: Vec~PathBuf~
+watch(path) Result
+on_change(callback) Result
}
%% 配置层
class ParsedConfig {
+tasks: Vec~TaskConfig~
+modules: Vec~ModuleConfig~
+global_configs: Vec~GlobalConfig~
+from_yaml(content) Result
}
class GlobalConfig {
+name: String
+databases: Option~DatabaseConfig~
+redis: Option~RedisConfig~
}
%% 执行层
class TaskExecutor {
-action_executor: ActionExecutor
+execute_task(task, context) Result
+execute_action(action, context) Result
}
class ExecutionContext {
+variables: HashMap~String, Value~
+global_config: Option~GlobalConfig~
+get_variable(key) Option~Value~
+set_variable(key, value)
+render_template(template) Result
}
%% 动作执行层
class ActionExecutor {
<<interface>>
+execute_action(action, context) Result
}
class DefaultActionExecutor {
+execute_call_function() Result
+execute_set_variable() Result
+execute_db_query() Result
+execute_http_request() Result
+execute_file_operation() Result
+execute_flow_control() Result
}
%% 关系定义
DSLScheduler --> SchedulerManager
SchedulerManager --> FileWatcher
SchedulerManager --> ParsedConfig
SchedulerManager --> TaskExecutor
ConfigParser --> ParsedConfig
ParsedConfig --> GlobalConfig
TaskExecutor --> ExecutionContext
TaskExecutor --> ActionExecutor
ActionExecutor <|.. DefaultActionExecutor
ExecutionContext --> GlobalConfig
flowchart LR
%% 输入
YAMLFile[📄 YAML配置文件] --> Parser[📋 解析器]
%% 解析阶段
Parser --> Config[⚙️ 解析后配置]
Config --> Tasks[📋 任务列表]
Config --> Modules[🧩 模块列表]
Config --> GlobalConfigs[🌍 全局配置]
%% 调度阶段
Tasks --> Scheduler[⚙️ 调度器]
Scheduler --> |定时触发| TaskQueue[📤 任务队列]
Scheduler --> |立即执行| TaskQueue
%% 执行阶段
TaskQueue --> Executor[🚀 执行器]
GlobalConfigs --> Context[📦 执行上下文]
Modules --> Context
Executor --> Context
%% 动作执行
Context --> ActionFlow{🎯 动作分发}
ActionFlow --> |流程控制| FlowControl[🔄 条件/循环]
ActionFlow --> |数据操作| DataOps[📊 变量/模板]
ActionFlow --> |外部交互| ExternalOps[🌐 文件/数据库/HTTP]
ActionFlow --> |函数调用| Functions[⚡ 内置/自定义函数]
%% 输出
Functions --> Results[📤 执行结果]
ExternalOps --> Results
DataOps --> Results
FlowControl --> Results
📖 详细指南: 查看 QUICKSTART.md 获取完整的快速开始指南
git clone https://github.com/jibenliu/dsl-scheduler.git
cd dsl-scheduler
cargo build --release- 从模板创建配置文件:
# 复制简单任务模板
cp config/templates/simple_task.yaml config/my_task.yaml或直接创建 config/my_task.yaml:
# 立即执行任务
- type: "task"
name: "Hello World"
actions:
- call_function: "print"
params:
message: "Hello from DSL Scheduler!"
# 定时任务 (每分钟执行)
- type: "task"
name: "定时任务"
cron: "0 * * * * *"
actions:
- call_function: "print"
params:
message: "定时任务执行中 - {{timestamp}}"- 运行调度器:
# 运行特定配置文件
cargo run config/my_task.yaml
# 运行配置目录中的所有配置文件
cargo run config/
# 运行示例配置
cargo run config/examples/comprehensive_features.yaml# 运行数据库功能完整演示
cargo run --example database_comprehensive_demo
# 运行完整调度器演示
cargo run --example full_scheduler_demo
# 运行变量生命周期演示
cargo run --example variable_lifecycle_demo
# 运行所有测试
cargo testuse dsl_scheduler::api::SchedulerManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建调度器
let manager = SchedulerManager::new().await?;
// 从配置目录启动
manager.start_from_config_dir("./config").await?;
// 保持运行
tokio::signal::ctrl_c().await?;
Ok(())
}- type: "task"
name: "简单任务"
actions:
- call_function: "print"
params:
message: "任务开始"
- call_function: "sleep"
params:
duration: 1000
- call_function: "print"
params:
message: "任务完成"- type: "task"
name: "条件任务"
actions:
- condition: "{{hour}} >= 9 && {{hour}} < 18"
actions:
- call_function: "print"
params:
message: "工作时间内执行"- type: "task"
name: "循环任务"
actions:
- loop_times: 5
actions:
- call_function: "print"
params:
message: "循环第 {{loop_index}} 次"# 定义模块
- type: "module"
name: "发送通知"
params:
- name: "message"
type: "string"
actions:
- call_function: "print"
params:
message: "通知: {{message}}"
# 使用模块的任务
- type: "task"
name: "通知任务"
actions:
- call_module: "发送通知"
params:
message: "系统启动完成"# 定义函数
- type: "function"
name: "calculate_area"
params:
- name: "width"
type: "number"
- name: "height"
type: "number"
return_type: "number"
actions:
- call_function: "math"
params:
operation: "multiply"
a: "{{width}}"
b: "{{height}}"
# 使用函数的任务
- type: "task"
name: "计算任务"
actions:
- call_function: "calculate_area"
params:
width: 10
height: 20# PostgreSQL 查询示例
- type: "task"
name: "用户数据查询"
cron: "0 */10 * * * *" # 每10分钟执行
actions:
- db_dsn: "postgresql://username:password@localhost:5432/dbname"
db_query: "SELECT id, username, email FROM users WHERE active = true LIMIT 10"
for_each_result:
- call_function: "print"
params:
message: "用户: {{username}}, 邮箱: {{email}}"
# MySQL 查询示例
- type: "task"
name: "订单统计"
actions:
- db_dsn: "mysql://root:password@localhost:3306/shop_db"
db_query: |
SELECT DATE(created_at) as order_date,
COUNT(*) as order_count,
SUM(total_amount) as total_sales
FROM orders
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY DATE(created_at)
for_each_result:
- call_function: "print"
params:
message: "{{order_date}}: {{order_count}} 订单, 总额: {{total_sales}}"
# SQLite 查询示例
- type: "task"
name: "SQLite日志分析"
actions:
- db_dsn: "sqlite://./logs.db"
db_query: "SELECT level, message, timestamp FROM logs WHERE level = 'ERROR'"
condition: "{{level}} == 'ERROR'"
for_each_result:
- call_function: "log"
params:
level: "warning"
message: "发现错误日志: {{message}}"使用全局配置可以避免在每个数据库查询中重复定义 DSN,让配置更简洁、更易维护:
# 定义全局配置
- type: config
name: main_db
config:
databases:
dsn: "postgresql://user:pass@localhost:5432/mydb"
pool_size: "10"
timeout: "30"
- type: config
name: redis_cache
config:
redis:
dsn: "redis://localhost:6379"
pool_size: "5"
# 使用全局配置的任务
- type: task
name: "使用全局配置的数据库查询"
actions:
# 创建用户表
- db_query: "CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, name VARCHAR(100), email VARCHAR(255))"
db_config: "main_db" # 引用全局配置,无需重复写 DSN
# 插入数据
- db_query: "INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com')"
db_config: "main_db"
# 查询数据
- db_query: "SELECT * FROM users ORDER BY id"
db_config: "main_db"
for_each_result:
- call_function: "print"
params:
message: "用户: {{name}}, 邮箱: {{email}}"
# 定时任务也可以使用全局配置
- type: task
name: "定时数据检查"
cron: "0 */5 * * * *" # 每5分钟执行一次
actions:
- db_query: "SELECT COUNT(*) as user_count FROM users"
db_config: "main_db" # 所有数据库操作都使用同一个配置
for_each_result:
- call_function: "print"
params:
message: "当前用户数量: {{user_count}}"全局配置的优势:
- ✅ 避免重复 - DSN 只需要定义一次
- ✅ 易于维护 - 修改数据库连接时只需要改一个地方
- ✅ 支持多种配置 - 数据库、Redis、外部 API 等
- ✅ 环境隔离 - 不同环境可以使用不同的全局配置
支持的配置类型:
databases- 数据库连接配置(dsn, pool_size, timeout 等)redis- Redis 连接配置external_apis- 外部 API 配置(api_key, base_url 等)
使用方式:
- 在数据库查询中使用
db_config: "配置名"替代db_dsn: "连接字符串" - 系统会自动从全局配置中查找对应的 DSN
print- 打印消息到控制台sleep- 暂停执行指定毫秒数
timestamp- 获取 Unix 时间戳uuid- 生成 UUID 字符串
random_number- 生成随机数(可指定范围)math- 数学运算(add/subtract/multiply/divide)
string_operations- 字符串操作(连接、大小写转换、去空格等)
array_operations- 数组操作(长度、追加、切片等)
- 支持 SQLite、PostgreSQL、MySQL 数据库查询
- 使用
db_dsn和db_query配置数据库操作 - 支持
for_each_result遍历查询结果
dsl-scheduler/
├── config/ # 配置文件目录
│ ├── examples/ # 完整示例配置
│ │ ├── comprehensive_features.yaml # 综合功能演示
│ │ ├── global_config_usage.yaml # 全局配置使用演示
│ │ └── production_tasks.yaml # 生产环境任务示例
│ ├── templates/ # 配置模板
│ │ ├── simple_task.yaml # 简单任务模板
│ │ ├── task_with_module.yaml # 带模块调用的任务模板
│ │ └── control_flow.yaml # 控制流程模板
│ └── README.md # 配置说明文档
├── examples/ # Rust 代码示例
│ ├── database_comprehensive_demo.rs # 数据库功能完整演示
│ ├── full_scheduler_demo.rs # 完整调度器演示
│ ├── global_config_demo.rs # 全局配置功能演示
│ ├── variable_lifecycle_demo.rs # 变量生命周期演示
│ ├── simple_database_test.rs # 简单数据库测试
│ ├── variable_lifecycle_demo.rs # 变量生命周期演示
│ ├── variable_lifecycle_explanation.rs # 变量生命周期说明
│ └── README.md # 示例说明文档
├── tests/ # 测试文件
│ ├── comprehensive_integration_tests.rs # 综合集成测试
│ └── integration_tests.rs # 基础集成测试
├── src/ # 源代码
│ ├── core/ # 核心功能模块
│ │ ├── actions/ # 动作执行器
│ │ ├── config.rs # 配置解析(含单元测试)
│ │ ├── context.rs # 执行上下文(含单元测试)
│ │ ├── executor.rs # 调度执行器
│ │ ├── scheduler.rs # 任务调度器
│ │ └── file_watcher.rs # 文件监控
│ ├── api.rs # API 接口
│ ├── lib.rs # 库入口
│ └── main.rs # 程序入口
└── README.md # 项目说明文档
# 运行所有测试
cargo test
# 运行示例
cargo run --example full_scheduler_demo
cargo run --example test_new_configsfull_scheduler_demo- 完整的调度器演示,包含文件监控test_new_configs- 多任务配置测试test_config_parsing- 配置解析测试test_module_config- 模块配置测试
- type: "task"
name: "数据库任务"
actions:
- db_query: "SELECT * FROM users WHERE active = 1"
db_name: "main_db"
for_each_result:
- call_function: "print"
params:
message: "用户: {{user_name}}"- type: "task"
name: "API调用"
actions:
- http_request:
url: "https://api.example.com/data"
method: "GET"
headers:
Authorization: "Bearer {{token}}"欢迎提交 Issue 和 Pull Request!
- Fork 本仓库
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 打开 Pull Request
本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。
DSL 调度器 遵循以下核心设计原则:
- 用户接口层 - CLI 和 API 接口,提供友好的用户交互
- 配置解析层 - YAML 配置文件解析和验证,支持热重载
- 核心调度层 - 任务调度和生命周期管理
- 执行引擎层 - 动作执行和上下文管理
- 插件系统层 - 可扩展的动作执行器
src/
├── api.rs // 🔌 外部 API 接口层
├── core/
│ ├── scheduler.rs // ⚙️ 核心调度器
│ ├── executor.rs // 🚀 任务执行引擎
│ ├── context.rs // 📦 执行上下文管理
│ ├── config.rs // 📋 配置解析与验证
│ ├── file_watcher.rs // 👁️ 文件监控服务
│ └── actions/ // 🎯 动作执行器插件
│ ├── flow.rs // 🔄 流程控制
│ ├── function.rs // ⚡ 函数调用
│ ├── database.rs // 🗄️ 数据库操作
│ ├── file.rs // 📁 文件操作
│ └── http.rs // 🌐 HTTP 请求- 声明式配置 - 用户通过 YAML 声明"做什么"而不是"怎么做"
- 热重载机制 - 配置变更自动检测和重载,无需重启
- 全局配置管理 - 统一管理数据库连接、API 密钥等配置
- 配置验证 - 启动时验证配置正确性,提前发现错误
graph LR
UserAction[用户自定义动作] --> ActionTrait[ActionExecutor Trait]
DatabaseAction[数据库动作] --> ActionTrait
HttpAction[HTTP动作] --> ActionTrait
FileAction[文件动作] --> ActionTrait
FunctionAction[函数动作] --> ActionTrait
ActionTrait --> Executor[任务执行器]
- 分层错误处理 - 每层都有适当的错误处理机制
- 优雅降级 - 单个任务失败不影响其他任务执行
- 详细错误信息 - 提供清晰的错误定位和解决建议
- 资源清理 - 确保异常情况下资源正确释放
- 异步执行 - 任务并发执行,充分利用系统资源
- 智能调度 - 避免资源冲突,优化执行顺序
- 内存管理 - 合理的变量生命周期管理
- 事件防抖 - 文件监控事件防抖,避免频繁重载
- ✅ 人类可读 - 相比 JSON 更易读写
- ✅ 注释支持 - 可以添加详细的配置说明
- ✅ 层次结构 - 天然支持复杂的嵌套配置
- ✅ 生态成熟 - Rust 社区有完善的 YAML 库支持
pub trait ActionExecutorTrait {
fn execute_action(&self, action: &Action, context: &mut ExecutionContext) -> anyhow::Result<()>;
}- ✅ 插件化 - 易于添加新的动作类型
- ✅ 测试友好 - 可以轻松 mock 动作执行器
- ✅ 类型安全 - 编译期保证接口一致性
# 全局配置 - 定义一次,到处使用
- type: config
name: main_db
config:
databases:
dsn: "postgresql://user:pass@localhost/db"
# 任务中引用 - 简洁且易维护
- type: task
actions:
- db_query: "SELECT * FROM users"
db_config: "main_db" # 引用全局配置- 变量隔离 - 每个任务有独立的变量空间
- 全局共享 - 全局配置在所有任务间共享
- 模板渲染 - 支持 Handlebars 模板语法
- 类型转换 - 智能的类型推断和转换
- 在
src/core/actions/下创建新模块 - 实现
ActionExecutorTrait - 在
DefaultActionExecutor中注册 - 添加相应的配置结构体
- 编写单元测试
- 扩展
GlobalConfig结构体 - 更新配置解析逻辑
- 在执行上下文中添加访问方法
- 编写集成测试
- 异步执行 - 任务并发执行,不阻塞主线程
- 内存效率 - 合理的资源管理和生命周期
- 热重载 - 配置变化时智能重载,避免重复启动
- 事件防抖 - 文件监控事件防抖,避免频繁重载
- 数据库集成支持(已完成:SQLite、PostgreSQL、MySQL)
- HTTP 客户端支持
- 消息队列集成
- Web 管理界面
- 监控和告警系统
- 函数库和包管理
- 更多内置函数(JSON、文件操作等)
-
拖拽式页面配置逻辑
- 实现基于 Web 的可视化配置编辑器
- 支持拖拽组件创建任务流程
- 实时预览配置效果
- 可视化编辑动作参数和条件
- 任务依赖关系图形化展示
-
配置自动生成与解析
- 保存可视化配置后自动生成 YAML 文件
- 根据 YAML 文件反向解析生成页面展示
- 配置版本管理和回滚功能
- 配置模板库和快速创建向导
-
更多数据库支持
- ClickHouse 支持(时序数据分析)
- TiDB 支持(分布式 MySQL)
- DuckDB 支持(嵌入式分析)
- InfluxDB 支持(时序数据库)
- CockroachDB 支持(分布式 SQL)
-
常用中间件支持
- Redis - 缓存操作、消息队列、分布式锁
- 基础 CRUD 操作
- 发布/订阅消息
- 分布式锁实现
- 缓存策略配置
- MongoDB - 文档数据库操作
- 集合 CRUD 操作
- 聚合管道支持
- 索引管理
- Elasticsearch - 搜索和分析
- 文档索引和搜索
- 聚合查询支持
- 索引模板管理
- Redis - 缓存操作、消息队列、分布式锁
-
分布式任务调度
- 多节点任务分发和负载均衡
- 节点故障转移和自动恢复
- 任务执行状态同步
- 分布式锁防止重复执行
-
集群管理功能
- 节点注册和发现
- 集群状态监控
- 配置中心集成
- 分布式日志收集
-
一次性任务持久化
- 任务执行状态持久化存储
- 服务重启后恢复任务状态
- 避免已完成任务重复执行
- 任务执行历史记录
-
任务状态管理
- 任务执行进度跟踪
- 任务暂停、恢复、取消功能
- 任务执行结果持久化
- 任务失败重试机制配置
-
Web 服务框架
- 基于配置文件定义 HTTP 接口
- 支持 RESTful API 自动生成
- 请求参数验证和转换
- 响应格式定制
-
API 管理功能
- API 文档自动生成
- 接口版本管理
- 请求限流和权限控制
- API 监控和统计
-
监控指标
- 任务执行成功率统计
- 系统资源使用监控
- 错误率和延迟监控
- 自定义业务指标收集
-
告警系统
- 任务失败告警
- 系统异常告警
- 性能阈值告警
- 多渠道告警支持(邮件、短信、钉钉等)
-
执行引擎优化
- 任务并行度智能调度
- 内存池和对象复用
- 异步 I/O 优化
- 编译时优化配置验证
-
存储优化
- 配置文件压缩和缓存
- 任务状态增量更新
- 日志轮转和清理策略
用 ❤️ 和 🦀 Rust 构建