This mini project demonstrates a real-time data pipeline using Apache Kafka. A custom Python producer continuously monitors a MySQL table and publishes new records to a Kafka topic as soon as they are inserted. On the other side, a consumer subscribes to this topic, processes the incoming records, and writes them into separate JSON files for storage or further use. The project also highlights Kafka’s partition–consumer mapping feature, showing that a single partition can only be consumed by one consumer instance within a consumer group.
- Python 3.7 or later
- Confluent Kafka Python client
- MySQLDatabase
- Apache Avro
- Asuitable IDE for coding (e.g., PyCharm, Visual Studio Code)
-
Creating Sql Table
First we have to create our database and table in Mysql you can find in the respective sql file here Table Schema
-
Creating Kafka Producer code
The python script is created in a way to keep track of the last read timestamp and id of the records from MySql table and fetch only new records. They query was
Here we also serialzie our records in Avro format. and publish the records into a kafka topic named product_updates. The ID column is used a key for paritioning, also it is String Serialized. Python script publishesh the code every 1 second and polls MySql to check for new records every 3 seconds. You can find the respective python producer file here
-
Creating the Kafka Cluster and Topic and Configuring
-
Creating the Kafka Consumer code
Consumer python script fetches data from kafka topic and De-serializes from Avro into python object. It also applies a small transformation in the records which is applying discount to products if they belong to a certain category. Then the objects are transformed into json format and appended into a json file. You can the three users.json file here.
In the above pic in the second and third record discount was applied since their category was books. I have run 3 consumer scripts parallely which write to three different json files. If you look in the json files no records belongs to more than one json file. This shows a kafka property that One partition in kafka topic can cater to only one consumer.
- Create appropriate MySql table
- Create Kafka Cluster and Topic and Configure
- Create and set the API keys
- Run Producer code and consumer code
- Insert records into the sql table
- And watch it unfold.
If you want to see a video of this mini project click here