A production-ready data pipeline for entity matching, web data enrichment, and analytics visualization using Apache Airflow, PySpark, and Docker.
This platform enables organizations to:
- Match entities across different data sources using fuzzy matching algorithms
- Enrich data by scraping external sources for additional information
- Visualize results through an interactive analytics dashboard
- Track matching quality with comprehensive metrics and tracing
Perfect for:
- Customer data deduplication
- Third-party data integration
- Entity resolution across databases
- Data quality improvement initiatives
- Master data management (MDM)
βββββββββββββββββββ
β Source A β (Internal Database)
β (Primary Data) β
ββββββββββ¬βββββββββ
β
ββββββββββββΊ βββββββββββββββββββββββ
β β Data Preparation β
β β - Normalization β
ββββββββββΌβββββββββ β - Deduplication β
β Source B ββββ€ - Web Enrichment β
β (External Data) β ββββββββββββ¬βββββββββββ
βββββββββββββββββββ β
β
βββββββββββββΌβββββββββββ
β Entity Matching β
β - Exact Matching β
β - LSH Fuzzy Match β
β - Multi-stage β
βββββββββββββ¬βββββββββββ
β
βββββββββββββΌβββββββββββ
β Visualization β
β - Match Analytics β
β - Quality Metrics β
ββββββββββββββββββββββββ
- Exact Matching: Fast matching on unique identifiers (tax IDs, registration numbers)
- Fuzzy Matching: LSH-based similarity matching for name fields
- Weighted Scoring: Configurable weights for different field importance
- Deduplication: Automatic removal of duplicate matches
- Configurable web scraping for external data sources
- Intelligent search strategies (by ID, name, etc.)
- Automatic data validation and cleaning
- Rate limiting and error handling
- Docker containerization for consistent deployment
- Apache Airflow for workflow orchestration
- PySpark for distributed data processing
- Incremental processing to avoid reprocessing
- Tracing & auditing of all matching attempts
- Real-time matching statistics
- Data quality metrics
- Field-level mismatch analysis
- Tracing analytics with attempt history
- Docker & Docker Compose
- 8GB+ RAM recommended
- Python 3.9+
- Clone the repository
git clone https://github.com/Chaimaaorg/entity-match-platform.git
cd entity-match-platform- Configure environment
cp .env.example .env
# Edit .env with your settings- Update configuration
Edit
config/config.ini:
[dev]
db = local_db
source_a_main = /app/data/source_a/entities.csv
source_b_main = /app/data/source_b/entities.csv- Start the platform
docker-compose up -d- Access services
- Airflow UI: http://localhost:8080 (user:
admin, password:admin) - Dashboard: Open
visualization/analytics_dashboard.htmlin browser
-
Prepare your data
- Place Source A data in
data/source_a/ - Place Source B data in
data/source_b/ - Ensure CSV files have proper headers
- Place Source A data in
-
Trigger the DAG
- Open Airflow UI
- Enable the
entity_matching_pipelineDAG - Trigger manually or wait for scheduled run
-
View results
- Matched records:
data/matched/results.csv - Processing logs: Check Airflow task logs
- Analytics: Load CSV files into the dashboard
- Matched records:
entity_id,name,tax_id,registration_number,city
001,Acme Corp,TX123456,REG789,New York
002,Beta LLC,TX234567,REG890,Los Angeles
entity_id,name,tax_id,registration_number,activity
B001,Acme Corporation,TX123456,REG789,Manufacturing
B002,Beta Limited,TX234567,REG890,Retail
- Unique identifier (entity_id, company_id, etc.)
- Name field (company name, organization name, etc.)
- Optional identifiers (tax_id, registration_number, etc.)
@dataclass
class MatchingConfig:
lsh_num_hash_tables: int = 3
lsh_distance_threshold: float = 0.5
similarity_threshold: float = 0.65
max_candidates_per_record: int = 10
weights: Dict[str, float] = {
"name": 0.45,
"registration_id": 0.25,
"tax_id": 0.25,
"other": 0.05
}enricher = EntityEnricher(
base_url="https://your-data-source.com/search",
search_params_mapping={
"tax_id": "tax_param",
"registration_id": "reg_param",
"name": "name_param"
}
)entity-match-platform/
βββ airflow/ # Airflow DAGs and configuration
β βββ dags/
β β βββ entity_matching_pipeline.py
β βββ Dockerfile
βββ config/ # Configuration files
β βββ config.ini
βββ src/ # Source code
β βββ enrichment/ # Web scraping modules
β β βββ web_scraper.py
β βββ matching/ # Matching engine
β βββ data_preparation.py
β βββ matcher.py
β βββ utils.py
βββ data/ # Data storage
β βββ source_a/ # Primary dataset
β βββ source_b/ # Secondary dataset
β βββ processed/ # Cleaned data
β βββ matched/ # Matching results
βββ visualization/ # Analytics dashboard
β βββ analytics_dashboard.html
βββ tests/ # Unit tests
- Update
config/config.iniwith new paths - Create data loaders in
src/matching/data_preparation.py - Adjust field mappings in matching configuration
Override methods in src/matching/matcher.py:
class CustomMatcher(LSHMatcher):
def _build_feature_pipeline(self, company_col: str):
# Custom text processing logic
passImplement scrape_entities_from_html() for your target website:
def scrape_entities_from_html(self, html: str):
# Parse HTML specific to your data source
soup = BeautifulSoup(html, "html.parser")
# Extract entity information
return entitiesContributions welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Submit a pull request
MIT License - see LICENSE file for details