Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be mongodb-cdc . |
hosts | required | (none) | String | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. localhost:27017,localhost:27018
|
username | optional | (none) | String | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
password | optional | (none) | String | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
database | optional | (none) | String | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. |
collection | optional | (none) | String | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. |
connection.options | optional | (none) | String | The ampersand-separated connection options of MongoDB. eg. replicaSet=test&connectTimeoutMS=300000
|
errors.tolerance | optional | none | String | Whether to continue processing messages if an error is encountered.
Accept none or all .
When set to none , the connector reports an error and blocks further processing of the rest of the records
when it encounters an error. When set to all , the connector silently ignores any bad messages.
|
errors.log.enable | optional | true | Boolean | Whether details of failed operations should be written to the log file. |
copy.existing | optional | true | Boolean | Whether copy existing data from source collections. |
copy.existing.pipeline | optional | (none) | String | An array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient. eg. [{"$match": {"closed": "false"}}] ensures that
only documents in which the closed field is set to false are copied.
|
copy.existing.max.threads | optional | Processors Count | Integer | The number of threads to use when performing the data copy. |
copy.existing.queue.size | optional | 16000 | Integer | The max size of the queue to use when copying data. |
batch.size | optional | 0 | Integer | Change stream cursor batch size. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster. The default is 0 meaning it uses the server's default value. |
poll.max.batch.size | optional | 1000 | Integer | Maximum number of change stream documents to include in a single batch when polling for new data. |
poll.await.time.ms | optional | 1500 | Integer | The amount of time to wait before checking for new results on the change stream. |
heartbeat.interval.ms | optional | 0 | Integer | The length of time in milliseconds between sending heartbeat messages. Use 0 to disable. |
Key | DataType | Description |
---|---|---|
database_name | STRING NOT NULL | Name of the database that contain the row. |
collection_name | STRING NOT NULL | Name of the collection that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
BSON type | Flink SQL type |
---|---|
TINYINT | |
SMALLINT | |
Int | INT |
Long | BIGINT |
FLOAT | |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3)TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0)TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex |
STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |