Skip to content

ankur334/logflow

Repository files navigation

Apache Flink ETL Pipeline - Kafka to Parquet

A production-ready Apache Flink streaming ETL pipeline that reads from Kafka, transforms data, and writes to Parquet format. Built with PyFlink Table API for high-level stream processing.

πŸš€ Quick Start

# Install dependencies
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

# Run the pipeline
python main.py pipeline flink_kafka_to_parquet \
    topic=last9Topic \
    sink_path=file://$(pwd)/parquet_output

πŸ“‹ Table of Contents

πŸ—οΈ Architecture Overview

CLI Entry Point (main.py)
    ↓
Pipeline Registry (cli_runner.py)  
    ↓
Pipeline Classes (flink_kafka_to_parquet.py)
    β”œβ”€β”€ Extractor (Kafka Source)
    β”œβ”€β”€ Transformer (Nested Field Extraction)  
    └── Sink (Parquet Writer)
    ↓
Flink TableEnvironment (Streaming Execution)

πŸ“¦ Main Components

1. main.py - CLI Entry Point

The main entry point provides two execution modes:

Pipeline Mode (Primary Use Case)

python main.py pipeline <pipeline_name> [parameters...]

Key Functions:

  • parse_pipeline_args(): Parses CLI arguments into pipeline name and parameters
  • Delegates to cli_runner for pipeline execution

Example:

python main.py pipeline flink_kafka_to_parquet \
    topic=myTopic \
    sink_path=file:///tmp/output

DAG Mode (Complex Workflows)

python main.py dag <workflow.yaml>

Executes multiple pipelines with dependencies.

2. Pipeline System

Base Pipeline (pipeline/base_pipeline.py)

Abstract base class defining the ETL pipeline interface:

class AbstractPipeline(ABC):
    @abstractmethod
    def run(self) -> None: pass
    
    @classmethod
    @abstractmethod  
    def build(cls, **kwargs): pass

Flink Kafka to Parquet Pipeline (pipeline/flink_kafka_to_parquet.py)

Main streaming pipeline implementation using Flink Table API:

  • Creates TableEnvironment for streaming mode
  • Configures checkpointing for fault tolerance
  • Orchestrates Extractor β†’ Transformer β†’ Sink flow

3. Extractors (Data Sources)

Flink Kafka Source (extractor/flink_kafka_extractor.py)

Creates Kafka source table with:

  • SASL/SSL authentication for Confluent Cloud
  • JSON format parsing
  • Configurable scan modes (earliest/latest offset)

Flink SQL DDL Example:

CREATE TABLE kafka_logs (
    `timestamp` STRING,
    serviceName STRING,
    severityText STRING,
    attributes MAP<STRING, STRING>,
    resources MAP<STRING, STRING>,
    body STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'last9Topic',
    'properties.bootstrap.servers' = 'broker:9092',
    'format' = 'json'
)

4. Transformers (Data Processing)

Log Transform (transformer/flink_log_transform.py)

Creates SQL view for data transformation:

  • Extracts nested JSON fields
  • Creates computed columns
  • Handles null values with COALESCE

Transformation SQL:

CREATE VIEW logs_enriched AS
SELECT
    `timestamp`,
    serviceName,
    severityText,
    attributes['msg'] AS msg,
    attributes['url'] AS url,
    COALESCE(JSON_VALUE(body, '$.data.mobile'), attributes['mobile']) AS mobile,
    attributes,
    resources,
    body
FROM kafka_logs

5. Sinks (Data Destinations)

Parquet Sink (sink/flink_parquet_sink.py)

Creates filesystem sink with:

  • Parquet format output
  • Configurable rolling policies
  • File size and time-based rolling

Sink Configuration:

CREATE TABLE parquet_sink (
    -- columns
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///path/to/output',
    'format' = 'parquet',
    'sink.rolling-policy.file-size' = '1KB',
    'sink.rolling-policy.rollover-interval' = '10s'
)

πŸ› οΈ Installation

Prerequisites

  • Python: 3.11.9 (tested version)
  • Java: 11 or higher (required by Apache Flink)
  • Memory: Minimum 4GB RAM recommended
  • Kafka Access: Confluent Cloud, AWS MSK, or RedPanda cluster

Setup Steps

  1. Create Virtual Environment:
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
  1. Install Python Dependencies:
pip install -r requirements.txt

Exact Python Dependencies:

apache-flink==1.20.0          # PyFlink framework
confluent-kafka==2.11.0       # Kafka client library
python-dotenv==1.1.1          # Environment configuration
PyYAML==6.0.2                 # YAML configuration support
pyarrow==11.0.0               # Parquet file format support
pandas==2.0.3                 # Data manipulation
pytest==8.4.1                 # Testing framework (dev dependency)
  1. Download Required JAR Files:
python download_jars.py

Complete JAR Dependencies with Exact Versions:

Component JAR File Version Purpose
Flink Connectors
Kafka Connector flink-sql-connector-kafka.jar 3.3.0-1.20 Kafka source/sink
Parquet Connector flink-sql-parquet.jar 1.20.0 Parquet file format
Hadoop Ecosystem
Hadoop Common hadoop-common.jar 3.3.4 HDFS/filesystem operations
Hadoop MapReduce hadoop-mapreduce-client-core.jar 3.3.4 MapReduce client
Hadoop Shaded Guava hadoop-shaded-guava.jar 1.1.1 Guava for Hadoop
Supporting Libraries
Google Guava guava.jar 31.1.0 Core utilities
Commons Configuration commons-configuration2.jar 2.8.0 Configuration management
Commons Text commons-text.jar 1.9 Text processing utilities
Stax2 API stax2-api.jar 4.2.1 XML processing API
Woodstox Core woodstox-core.jar 6.4.0 XML processing implementation

Total JAR Size: ~30MB (10 JAR files)

  1. Verify Installation:
# Check Python and dependencies
python --version  # Should show 3.11.9
python -c "import pyflink; print(pyflink.__version__)"  # Should show 1.20.0

# Check JAR files
ls -la jars/  # Should list 10 JAR files

# Run tests to verify setup
python -m pytest tests/ -v

πŸ“Š Flink Concepts Explained

Table API & SQL

Our pipeline uses Flink's Table API, which provides:

  • High-level abstraction over DataStream API
  • SQL-like operations for data processing
  • Unified batch/streaming processing model

Streaming Execution

# Create streaming environment
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)

# Configure checkpointing for fault tolerance
t_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "10 s"
)

Connectors

Flink uses connectors to interact with external systems:

Kafka Connector:

  • Reads unbounded streams from Kafka topics
  • Supports exactly-once semantics with checkpointing
  • Handles authentication (SASL/SSL)

Filesystem Connector:

  • Writes to various file formats (Parquet, CSV, JSON)
  • Implements rolling file policies
  • Supports partitioned writes

Checkpointing & Fault Tolerance

  • Checkpoints: Periodic snapshots of streaming state
  • Recovery: Automatic recovery from failures
  • Exactly-once: Guarantees no data loss or duplication

πŸ§ͺ Testing

Run tests with pytest:

# Run all tests
python -m pytest tests/ -v

# Run specific test file
python -m pytest tests/test_main.py -v

# Run with coverage
python -m pytest tests/ --cov=. --cov-report=html

Test organization:

  • tests/test_main.py - Entry point and CLI tests
  • tests/test_pipeline.py - Pipeline logic tests
  • tests/test_extractors.py - Source connector tests
  • tests/test_transformers.py - Transformation logic tests
  • tests/test_sinks.py - Sink connector tests

βš™οΈ Configuration

Environment Configuration

Configure credentials in .env.dev file (for development):

# Development Environment Configuration for Kafka
BOOTSTRAP_SERVERS=pkc-xxxxx.us-east-2.aws.confluent.cloud:9092
SASL_USERNAME=YOUR_API_KEY
SASL_PASSWORD=YOUR_API_SECRET
KAFKA_TOPIC=last9Topic
SINK_PATH=file:///tmp/last9_parquet

Security Notes:

  • .env.dev is already in .gitignore and won't be committed
  • Never commit credentials to version control
  • Use different .env files for different environments (dev, staging, prod)

Kafka Configuration

The config/properties/confluent.properties file now uses placeholders:

bootstrap.servers=${BOOTSTRAP_SERVERS}
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=${SASL_USERNAME}
sasl.password=${SASL_PASSWORD}

Pipeline Parameters

Common parameters for flink_kafka_to_parquet:

  • topic - Kafka topic to consume
  • sink_path - Output path for Parquet files
  • bootstrap_servers - Kafka brokers (optional, uses config)
  • scan_startup_mode - earliest-offset or latest-offset

πŸ“ Usage Examples

Basic Kafka to Parquet

python main.py pipeline flink_kafka_to_parquet \
    topic=events \
    sink_path=file:///data/events_parquet

With Custom Configuration

python main.py pipeline flink_kafka_to_parquet \
    topic=logs \
    sink_path=file:///data/logs \
    scan_startup_mode=latest-offset \
    checkpoint_interval=30s

DAG Workflow

Create workflow.yaml:

max_workers: 2
steps:
  - name: extract_data
    pipeline: kafka_extractor
    params:
      topic: raw_events
  - name: transform_data  
    pipeline: data_transformer
    depends_on: [extract_data]
  - name: load_data
    pipeline: parquet_loader
    depends_on: [transform_data]

Run workflow:

python main.py dag workflow.yaml

πŸ” Monitoring & Debugging

Console Output

The pipeline prints data flow information:

πŸš€ Starting pipeline: flink_kafka_to_parquet
πŸ“‹ Parameters: {'topic': 'last9Topic', 'sink_path': 'file:///tmp/output'}
KafkaData>:6> +I[2025-08-06T09:12:30, user-service, DEBUG, {...}]

Output Files

Parquet files are created with rolling policies:

parquet_output/
β”œβ”€β”€ part-xxx-4-0 (3.4KB)
β”œβ”€β”€ part-xxx-5-0 (4.4KB)
└── part-xxx-6-0 (3.9KB)

Checkpoints

Monitor checkpointing in /tmp/flink-checkpoints/

πŸš€ Production Deployment

Best Practices

  1. Resource Allocation: Set appropriate parallelism
  2. Checkpointing: Configure based on data volume
  3. Monitoring: Use Flink metrics and logging
  4. Error Handling: Implement proper retry logic
  5. Schema Evolution: Plan for data schema changes

Performance Tuning

  • Adjust checkpoint_interval based on throughput
  • Configure rolling_policy for optimal file sizes
  • Set appropriate watermark_delay for late data

πŸ“š Learn More

πŸ”§ Troubleshooting

Version Compatibility Issues

JAR Version Mismatches:

# If you see ClassNotFoundException or NoSuchMethodError
# Verify all JARs are compatible with Flink 1.20
ls -la jars/
python download_jars.py  # Re-download if needed

Python Version Issues:

# If PyFlink fails to import
python --version  # Must be 3.11.9 or compatible
pip install --upgrade apache-flink==1.20.0

Memory Issues:

# If Flink jobs fail with OutOfMemoryError
export FLINK_ENV_JAVA_OPTS="-Xmx2g -Xms1g"

Version Verification Commands

# Quick health check
python -c "
import sys
print(f'Python: {sys.version}')
import pyflink
print(f'PyFlink: {pyflink.__version__}')
import pandas as pd
print(f'Pandas: {pd.__version__}')
import pyarrow as pa
print(f'PyArrow: {pa.__version__}')
"

# JAR verification
find jars -name "*.jar" -exec basename {} \; | sort

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Submit a pull request

πŸ“„ License

This project is licensed under the MIT License.

About

Log Streaming Flow using Apache Flink

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages