Skip to content

Latest commit

 

History

History
136 lines (117 loc) · 4.59 KB

data-pipeline.md

File metadata and controls

136 lines (117 loc) · 4.59 KB
title weight type aliases
Data Pipeline
1
docs
/core-concept/data-pipeline/

Definition

Since events in Flink CDC flow from the upstream to the downstream in a pipeline manner, the whole ETL task is referred as a Data Pipeline.

Parameters

A pipeline corresponds to a chain of operators in Flink.
To describe a Data Pipeline, the following parts are required:

  • [source]({{< ref "docs/core-concept/data-source" >}})
  • [sink]({{< ref "docs/core-concept/data-sink" >}})
  • pipeline

the following parts are optional:

  • [route]({{< ref "docs/core-concept/route" >}})
  • [transform]({{< ref "docs/core-concept/transform" >}})

Example

Only required

We could use following yaml file to define a concise Data Pipeline describing synchronize all tables under MySQL app_db database to Doris :

   source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   transform:
     - source-table: adb.web_order01
       projection: \*, UPPER(product_name) as product_name
       filter: id > 10 AND order_id > 100
       description: project fields and filter
     - source-table: adb.web_order02
       projection: \*, UPPER(product_name) as product_name
       filter: id > 20 AND order_id > 200
       description: project fields and filter

   route:
     - source-table: app_db.orders
       sink-table: ods_db.ods_orders
     - source-table: app_db.shipments
       sink-table: ods_db.ods_shipments
     - source-table: app_db.products
       sink-table: ods_db.ods_products

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

With optional

We could use following yaml file to define a complicated Data Pipeline describing synchronize all tables under MySQL app_db database to Doris and give specific target database name ods_db and specific target table name prefix ods_ :

   source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   transform:
     - source-table: adb.web_order01
       projection: \*, format('%S', product_name) as product_name
       filter: addone(id) > 10 AND order_id > 100
       description: project fields and filter
     - source-table: adb.web_order02
       projection: \*, format('%S', product_name) as product_name
       filter: addone(id) > 20 AND order_id > 200
       description: project fields and filter

   route:
     - source-table: app_db.orders
       sink-table: ods_db.ods_orders
     - source-table: app_db.shipments
       sink-table: ods_db.ods_shipments
     - source-table: app_db.products
       sink-table: ods_db.ods_products

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2
     user-defined-function:
       - name: addone
         classpath: com.example.functions.AddOneFunctionClass
       - name: format
         classpath: com.example.functions.FormatFunctionClass

Pipeline Configurations

The following config options of Data Pipeline level are supported:

parameter meaning optional/required
name The name of the pipeline, which will be submitted to the Flink cluster as the job name. optional
parallelism The global parallelism of the pipeline. Defaults to 1. optional
local-time-zone The local time zone defines current session time zone id. optional