FlakeRunner is a Go-based data orchestration framework that manages data processing workflows by coordinating S3 file operations with EMR Serverless PySpark jobs. It provides prefix-based routing to determine target Snowflake tables and processing configurations, with comprehensive state management through DynamoDB.
- S3-EMR Serverless Integration: Seamlessly orchestrate PySpark jobs on EMR Serverless
- Prefix-Based Table Mapping: Route S3 files to appropriate Snowflake tables based on prefixes
- Control File Validation: Ensure data integrity with metadata validation and checksums
- State Management: Track processing states in DynamoDB with full audit trails
- Fluent API Design: Chain operations for clean, readable workflows
- Comprehensive Error Handling: Built-in retry logic and dead letter queue support
- AWS Resource Management: Create and manage required AWS infrastructure
S3 Input β Validation β EMR Serverless Processing β S3 Output β Snowflake
β β
DynamoDB βββββ State Tracking βββββ CloudWatch Logs
# Clone the repository
git clone https://github.com/allen13/flake-runner.git
cd flake-runner
# Build the binary
go build -o flake-runner ./cmd
# Or install directly
go install github.com/allen13/flake-runner/cmd@latestCreate a configuration file (config.json) with your AWS resources and prefix mappings:
{
"aws_profile": "default",
"aws_region": "us-east-1",
"input_bucket_name": "my-data-input",
"output_bucket_name": "my-data-output",
"staging_bucket_name": "my-data-staging",
"control_table_name": "file-orchestrations-prod",
"emr_application_id": "00abc123def456gh",
"emr_execution_role_arn": "arn:aws:iam::123456789012:role/EMRServerlessExecutionRole",
"job_timeout_minutes": 30,
"max_retries": 3,
"control_ttl_days": 7,
"prefix_mappings": [
{
"s3_prefix": "customers/",
"target_name": "CUSTOMERS",
"entry_point": "s3://my-staging/scripts/customer_processor.py",
"processing_config": {
"file_format": "CSV",
"compression_type": "GZIP",
"max_file_size": 5368709120
},
"validation_rules": {
"validate_record_count": true,
"validate_file_size": true,
"validate_checksum": true,
"required_fields": ["customer_id", "email", "name"]
}
}
]
}# Validate configuration and AWS resources
$ flake-runner init -config production.json
β
Flake Runner initialized successfully!
π Job ID: job-a1b2c3d4
ποΈ Configured with 3 prefix mappings:
β’ customers/ β CUSTOMERS
β’ orders/ β ORDERS
β’ products/ β PRODUCTS
π Ready to process files!# Process a single file
$ flake-runner process -file s3://my-bucket/customers/customer_data.csv
Processing file: s3://my-bucket/customers/customer_data.csv
β
File processing initiated successfully!
π Job ID: job-a1b2c3d4
# Process with control data validation
$ flake-runner process -file s3://my-bucket/customers/data.csv \
-control-data '{"file_name":"data.csv","file_size":1024,"file_hash":"abc123","record_count":100}'
Processing file: s3://my-bucket/customers/data.csv
Using provided control data for validation
β
File processing initiated successfully!
π Job ID: job-b2c3d4e5
# Process and wait for completion
$ flake-runner process -file s3://my-bucket/orders/orders_2024.csv -wait
Processing file: s3://my-bucket/orders/orders_2024.csv
β
File processing initiated successfully!
π Job ID: job-c3d4e5f6
β³ Waiting for job completion (timeout: 120 minutes, poll interval: 30 seconds)...
π Starting job polling at 14:23:45
[0s] State: PROCESSING - EMR Serverless Processing
π EMR Job: RUNNING
[30s] State: PROCESSING - EMR Serverless Processing
π EMR Job: SUCCESS
β
EMR job completed successfully, updating orchestration state...
π Orchestration state updated to PROCESSED
[31s] State: PROCESSED - Processing Completed Successfully
π Job processing completed successfully in 31s!
π Job Summary:
File: orders_2024.csv
Target Table: ORDERS
Batch ID: batch_20240107_001
Started: 2024-01-07 14:23:45
Completed: 2024-01-07 14:24:16
Duration: 31s
π Validation Results:
Expected Records: 50000
Actual Records: 50000
Record Count Match: true
File Size Match: true
Checksum Match: true
β‘ EMR Job Details:
Job Run ID: 00f1qa2lmnmnhj1n
Status: SUCCESS
Processed Records: 50000# Get status of a specific file
$ flake-runner status -file s3://my-bucket/customers/data.csv
File: s3://my-bucket/customers/data.csv
State: COMPLETED
Target: CUSTOMERS
Started: 2024-01-07 10:15:23
Completed: 2024-01-07 10:18:45
Duration: 3m22s
# Query files by state
$ flake-runner status -state FAILED
Found 2 files in FAILED state:
1. s3://my-bucket/orders/bad_file.csv
Failed at: 2024-01-07 09:45:12
Error: Record count validation failed: expected 1000, got 950
2. s3://my-bucket/products/corrupt.parquet
Failed at: 2024-01-07 11:23:45
Error: Parquet file corrupted: invalid magic bytes# View job logs
$ flake-runner emr -action logs -file s3://my-bucket/customers/data.csv
Fetching EMR job logs for: job-abc12345
Job Run ID: 00f1qa2lmnmnhj1n
[DRIVER] 2024-01-07 10:15:45 INFO Processing started for s3://my-bucket/customers/data.csv
[DRIVER] 2024-01-07 10:15:46 INFO Found 50000 records in CSV file
[DRIVER] 2024-01-07 10:15:47 INFO Applying transformations...
[DRIVER] 2024-01-07 10:16:23 INFO Writing output to s3://my-output/processed/CUSTOMERS/
[DRIVER] 2024-01-07 10:16:45 INFO Processing completed successfully
# Cancel a running job
$ flake-runner emr -action cancel -file s3://my-bucket/customers/large_file.csv
β
Job cancellation requested successfully# List all AWS resources
$ flake-runner aws --action list
π Listing AWS Resources...
Region: us-east-1
π¦ S3 Buckets:
β
Input Bucket: my-data-input (accessible)
β
Output Bucket: my-data-output (accessible)
β
Staging Bucket: my-data-staging (accessible)
ποΈ DynamoDB Tables:
β
Control Table: file-orchestrations-prod (accessible)
β‘ EMR Serverless Applications:
β
Application: 00abc123def456gh (accessible)
β
Resource listing complete
# Create missing AWS resources
$ flake-runner aws --action create --create --force
ποΈ Creating AWS Resources...
π¦ S3 Buckets:
β
Input Bucket: my-data-input (already exists)
β
Created Output Bucket: my-data-output
β
Created Staging Bucket: my-data-staging
ποΈ DynamoDB Tables:
β
Created Control Table: file-orchestrations-prod
β‘ EMR Serverless Applications:
ποΈ Creating EMR Application with IAM Role: flake-runner-production...
β
Created EMR Application: flake-runner-production
π Application ID: 00xyz789ghi012jk
π IAM Role ARN: arn:aws:iam::123456789012:role/EMRServerlessExecutionRole-flake-runner-production
π‘ Update your configuration file with these values:
"emr_application_id": "00xyz789ghi012jk",
"emr_execution_role_arn": "arn:aws:iam::123456789012:role/EMRServerlessExecutionRole-flake-runner-production"
β
Resource creation complete
# Upload files to S3
$ flake-runner aws --action upload --local customer_data.csv --file customers/upload_20240107.csv
π€ Uploading file: customer_data.csv β customers/upload_20240107.csv
β
Successfully uploaded to: s3://my-data-input/customers/upload_20240107.csv
π File size: 2483901 bytes
π― Target table: CUSTOMERS
π‘ Process with: flake-runner process --file s3://my-data-input/customers/upload_20240107.csvControl files provide metadata for validation:
{
"file_name": "customer_data.csv",
"file_size": 2483901,
"file_hash": "a1b2c3d4e5f6789012345678901234567890abcd",
"record_count": 50000,
"column_count": 12,
"created_at": "2024-01-07T10:00:00Z",
"batch_id": "batch_20240107_001"
}Files progress through the following states:
- INITIATED - File processing started
- VALIDATING - Validating file and control data
- VALIDATED - Validation completed successfully
- STAGING - Preparing files for processing
- STAGED - Files staged successfully
- PROCESSING - EMR Serverless job running
- PROCESSED - Processing completed successfully
- LOADING - Loading data to Snowflake
- LOADED - Data loaded successfully
- COMPLETED - All processing completed
- FAILED - Processing failed (with retry support)
Try the included demo to see FlakeRunner in action:
# Run the CSV to Parquet conversion demo
./demo_csv_to_parquet_full.sh
FlakeRunner Full End-to-End CSV to Parquet Demo
================================================
This demo showcases the complete FlakeRunner framework workflow:
β’ AWS resource creation (S3, DynamoDB, EMR Serverless)
β’ CSV file upload and validation
β’ Actual EMR job submission and processing
β’ Job monitoring and log retrieval
β’ Processed file download and verification# Run tests
go test ./...
# Run linting
./scripts/go-lint.sh
# Build for different platforms
GOOS=linux GOARCH=amd64 go build -o flake-runner-linux ./cmd
GOOS=darwin GOARCH=amd64 go build -o flake-runner-darwin ./cmd
GOOS=windows GOARCH=amd64 go build -o flake-runner.exe ./cmdMIT License - see LICENSE file for details.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
For issues, questions, or contributions, please open an issue on GitHub.