title | weight | type | aliases | |
---|---|---|---|---|
Data Pipeline |
1 |
docs |
|
Since events in Flink CDC flow from the upstream to the downstream in a pipeline manner, the whole ETL task is referred as a Data Pipeline.
A pipeline corresponds to a chain of operators in Flink.
To describe a Data Pipeline, the following parts are required:
- [source]({{< ref "docs/core-concept/data-source" >}})
- [sink]({{< ref "docs/core-concept/data-sink" >}})
- pipeline
the following parts are optional:
- [route]({{< ref "docs/core-concept/route" >}})
- [transform]({{< ref "docs/core-concept/transform" >}})
We could use following yaml file to define a concise Data Pipeline describing synchronize all tables under MySQL app_db database to Doris :
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
We could use following yaml file to define a complicated Data Pipeline describing synchronize all tables under MySQL app_db database to Doris and give specific target database name ods_db and specific target table name prefix ods_ :
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
The following config options of Data Pipeline level are supported:
parameter | meaning | optional/required |
---|---|---|
name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional |
parallelism | The global parallelism of the pipeline. Defaults to 1. | optional |
local-time-zone | The local time zone defines current session time zone id. | optional |