一个基于 Go 的数据库 ETL 工具,用于从 mssql / postgres / greenplum 读取数据,经过统一转换后写入目标库。
CREATE SCHEMA IF NOT EXISTS manager;
CREATE TABLE IF NOT EXISTS manager.job_data_sync_v2
(
job_id serial primary key,
src_schema_name character varying(100) COLLATE pg_catalog."default",
src_table_name character varying(100) COLLATE pg_catalog."default",
dst_schema_name character varying(100) COLLATE pg_catalog."default",
dst_table_name character varying(100) COLLATE pg_catalog."default",
src_select_statement text COLLATE pg_catalog."default",
src_where_statement text COLLATE pg_catalog."default",
sync_mode character varying(10) COLLATE pg_catalog."default",
src_incr_field character varying(100) COLLATE pg_catalog."default",
dst_pk character varying(100) COLLATE pg_catalog."default",
fields_mapping jsonb,
incr_point text COLLATE pg_catalog."default",
cdt timestamp without time zone,
udt timestamp without time zone,
remark text COLLATE pg_catalog."default",
inuse boolean,
src_conn_name character varying(50) COLLATE pg_catalog."default",
src_db_name character varying(50) COLLATE pg_catalog."default",
job_name character varying(50) COLLATE pg_catalog."default",
src_conn_id integer,
dst_distributed_by character varying(100) COLLATE pg_catalog."default",
created_by character varying(50) COLLATE pg_catalog."default",
modified_by character varying(50) COLLATE pg_catalog."default",
src_rawsql text COLLATE pg_catalog."default"
)
TABLESPACE pg_default;
程序默认从项目根目录读取 config.yaml,也可通过 -config 参数指定路径。
name: my_etl_job # 全局任务名,必填
comment: 可选备注
error_policy: abort # abort(默认)或 continue
databases:
- name: source-mssql
type: mssql
host: mssql.example.internal
port: 1433
user: <USERNAME>
password: <PASSWORD>
database: <DATABASE_NAME>
tasks:
- name: sample_metric_sync
type: query
sources:
- dbname: source-mssql
batch_size: 10000
table: dbo.source_table
incr_field: updated_at
target:
dbname: target-pg
table: target_schema.target_metric
mode: merge
pk: pk_col_1,pk_col_2name:全局任务名,必填。写入 watermark 表时作为job_name的默认值(可被tasks[].name覆盖)。comment:可选备注。error_policy:任务失败策略,abort(默认,遇错立即退出)或continue(跳过失败任务继续执行)。databases:数据库连接定义列表。tasks:任务定义列表。
数据库连接定义列表。
每项字段如下:
| 字段 | 说明 |
|---|---|
name |
数据库别名,供 sources[].dbname 和 target.dbname 引用 |
type |
数据库类型:mssql、postgres、greenplum |
host |
主机地址 |
port |
端口 |
user |
用户名 |
password |
密码 |
database |
数据库名 |
任务定义列表。每个任务将多个 sources 的数据写入一个 target。
| 字段 | 说明 |
|---|---|
name |
任务名称,配置了 incr_field 时必填,写入 manager.job_data_sync_v2.job_name |
type |
任务类型,目前支持 query |
comment |
可选备注 |
sources |
源配置列表,见下节 |
target |
目标配置,见下节 |
每个 source 表示一个源库读取定义。
| 字段 | 必填 | 说明 |
|---|---|---|
dbname |
✅ | 引用 databases[].name |
sql |
二选一 | 自定义查询语句 |
table |
二选一 | 直接读取的表名,格式 schema.table;仅 SQL Server 支持 db.schema.table |
where_statement |
附加过滤条件;使用 table 时直接拼到 WHERE,使用 sql 时作为外层过滤条件追加 |
|
fields_mapping |
字段投影/映射;仅支持简单 map,格式为 源字段或表达式: 目标字段 |
|
batch_size |
每批读取行数,默认 10000 |
|
incr_field |
增量抽取字段名(日期/时间类型),配合 watermark 实现断点续传 | |
incr_point |
增量起点,通常由程序从 watermark 自动回填,也可手动指定初始值 | |
order_by |
查询排序表达式,启用 commit_batch_size 时若未指定则自动补为 <incr_field> ASC |
sql和table至少配置一个,不能同时配置。- 使用
table时,watermark 的src_schema_name/src_table_name直接从table解析;只有 SQL Server 源支持三段式db.schema.table。 - 使用
table时,优先保留table形态,where_statement/fields_mapping在 reader 阶段再拼装查询,不会在配置加载阶段改写成 SQL。 - 使用
sql时,watermark 的源标识使用 SQL 文本本身(写入manager.job_data_sync_v2.src_rawsql)。
当配置了 incr_field 时,必须满足:
tasks[].name必须配置。- 启用
commit_batch_size时,查询必须有序(框架自动补ORDER BY incr_field ASC,或手动指定order_by)。
| 字段 | 必填 | 说明 |
|---|---|---|
dbname |
✅ | 引用 databases[].name |
table |
✅ | 目标表,建议使用 schema.table 格式 |
mode |
✅ | 写入模式,见下节 |
pk |
merge 时必填 | 主键列列表,多列用逗号分隔,如 id 或 id,tenant_id |
commit_batch_size |
分段提交粒度(批次数),0 表示整个任务在单个事务中完成(默认);设置后每 N 个 batch 提交一次事务并更新水位,适用于超大表 |
| 值 | 说明 |
|---|---|
copy |
直接 COPY 到目标表,不清空已有数据,仅执行追加写入 |
full |
先清空目标表,再将本次抽取结果全量覆盖写入 |
append |
与 copy 一样追加写入,但要求 incr_field,并在写入后更新水位 |
merge |
先写入临时表,再按 pk 做 DELETE + INSERT(支持增量 upsert) |
name: etl_copy_demo
tasks:
- type: query
sources:
- dbname: source-mssql
batch_size: 10000
sql: "SELECT * FROM [dbo].[source_table]"
target:
dbname: target-pg
table: target_schema.target_table_1
mode: copyname: etl_table_copy
tasks:
- type: query
sources:
- dbname: source-mssql
batch_size: 10000
table: "dbo.source_table_2"
where_statement: "is_deleted = 0"
fields_mapping:
source_id: id
CustomerName: customer_name
updated_at: updated_at
target:
dbname: target-pg
table: target_schema.target_table_2
mode: copyname: etl_incremental
tasks:
- name: order_sync
type: query
sources:
- dbname: source-mssql
batch_size: 10000
table: dbo.orders
incr_field: updated_at
target:
dbname: target-pg
table: ods.orders
mode: merge
pk: order_idname: etl_large_table
tasks:
- name: big_table_sync
type: query
sources:
- dbname: source-mssql
batch_size: 10000
table: dbo.big_table
incr_field: updated_at
# order_by 可省略,自动补为 "updated_at ASC"
target:
dbname: target-pg
table: ods.big_table
mode: merge
pk: id
commit_batch_size: 100 # 每 100 批(约 100 万行)提交一次事务当 source 配置了 incr_field 时,程序会读写 manager.job_data_sync_v2 表来记录同步进度(水位)。
注意:数据库表中的字段名称维持建表语句原样,不随配置 key 的改名而变化。
| 场景 | 匹配字段 |
|---|---|
table source |
job_name + src_schema_name + src_table_name + dst_schema_name + dst_table_name |
sql source |
job_name + src_rawsql + dst_schema_name + dst_table_name |
| 数据库字段 | 来源 |
|---|---|
job_name |
tasks[].name |
src_schema_name / src_table_name |
sources[].table 解析 |
src_rawsql |
sources[].sql 文本 |
dst_schema_name / dst_table_name |
target.table 解析 |
incr_point |
程序运行时写入,每段(或整体)提交时更新为当前批次 MAX(incr_field) |
sync_mode |
target.mode |
src_incr_field |
sources[].incr_field |
dst_pk |
target.pk |
- 查
manager.job_data_sync_v2.incr_point - 若无记录或为空 → 查目标表
MAX(incr_field) - 若目标表也无数据 → 根据字段名推断兜底值(时间类字段返回
1970-01-01 00:00:00.000,其他返回1)
# 单次执行(默认)
go run . -config config.yaml
# 后台服务模式(每 5 分钟执行一次)
go run . -config config.yaml -mode server -interval 5m或构建后执行:
make build
./bin/db-etl-linux-amd64 -config config.yaml