CLI tool for Kafka's messages migration. It can be useful for the next cases:
- You need to resend Kafka's messages to the same or another topic or even broker. And especially if you need to do it with filtration.
- You need to search some messages in concrete topic and store them.
- You need to send messages from stored backup or build them from provided data with template.
This app provides these possibilities.
The key features:
- Search messages with possibility to save them in JSON Line format
- Migrate messages between topics and clusters.
- Possibility to specify time range for speed up.
- Possibility to specify number of threads (consumers) for consuming.
- Backup messages and restore them from this backup.
The filtration mechanism is based on the cel-go package which is implementation of CEL spec with some additional extensions:
The CEL transform can be also useful for the building objects from the scratch, for example to build kafka messages for producing or supplying backup and restoring.
Examples:
- filter example: filter.txt
- migration transform: migration_transform.txt
- backup transform: store_transform.txt
- restore transform: restore_transform.txt
- build elements from scratch: data.jsonl and build.txt
Entrypoint source is self variable.
Kafka message format description:
{
"TopicPartition": {
"Topic": "<topic name>",
"Partition": 10,
"Offset": 0,
"Metadata": null,
"Error": null,
"LeaderEpoch": 0
},
"Value": "<bytes>",
"Key": "<bytes>",
"Timestamp": "2024-10-30T05:00:05.734+07:00",
"TimestampType": 1,
"Opaque": null,
"Headers": [
{
"Key": "<header name>",
"Value": "<bytes>"
}
],
"LeaderEpoch": 0
}Some additional functions for cel:
uuid()- generates random uuid (v4), also availableuuid(b'...'anduuid("...")uuid.v[1,4,6,7]()- generates uuid of specified version (uuid.v1(),uuid.v4(),uuid.v6()oruuid.v7())marschal(any)- marshal provided data to bytesunmarshal([]byte)- unmarshal bytes to data<timestamp>.unix()- get unix time in seconds<timestamp>.unixMilli()- get unix time in milliseconds<timestamp>.unixSubmilli()- get unix time in seconds with milliseconds as fractional part
By default, you can't to specify source as destination without direct allowance via special flag --leeroy=true.
Usage of kafkaquarius-current-linux:
migrate
search
produce
Usage of migrate:
-consumer-group string
required
-filter-file string
required, CEL filter
-leeroy
fatuity and courage
-since-time int
unix epoch time, 0 by default
-source-broker string
required
-source-topic string
required
-target-broker string
--source-broker is used if empty
-target-topic string
--source-topic is used if empty
-template-file string
optional, CEL transform
-threads-number int
(default 1)
-to-time int
unix epoch time, infinity by default
Usage of search:
-consumer-group string
required
-filter-file string
required, CEL filter
-output-file string
-since-time int
unix epoch time, 0 by default
-source-broker string
required
-source-topic string
required
-template-file string
optional, CEL transform
-threads-number int
(default 1)
-to-time int
unix epoch time, infinity by default
Usage of produce:
-filter-file string
optional, CEL filter
-source-file string
required, JSONL
-target-broker string
required
-target-topic string
required
-template-file string
required, CEL transform
./build/bin/kafkaquarius-current-linux search --consumer-group=kafkaquarius --source-broker=localhost:9092 --source-topic=test-topic --filter-file=examples/filter.txt --output-file=examples/out.jsonl --threads-number=10 --since-time=1735664400 --to-time=1738342800Total: 1000
Found: 5
Proc: 5
Errors: 0
Time: 1m1s
./build/bin/kafkaquarius-current-linux migrate --consumer-group=kafkaquarius --source-broker=localhost:9092 --source-topic=test-topic --target-topic=target-test-topic --filter-file=examples/filter.txt --threads-number=10 --since-time=1735664400 --to-time=1738342800Total: 1000
Found: 5
Proc: 5
Errors: 0
Time: 1m1s
./build/bin/kafkaquarius-current-linux produce --target-broker=localhost:9094 --target-topic=some-target-topic --template-file=examples/restore_transform.txt --source-file=examples/out.jsonl