De Lab Manual
De Lab Manual
To install and configure Elasticsearch, Kibana, PostgreSQL, and pgAdmin 4, follow these
steps for each tool. I will provide instructions for a typical Linux system (Ubuntu), but the
steps are similar for other platforms.
Elasticsearch is a distributed search and analytics engine. It stores data in the form of indices
and allows real-time searching and querying.
a. Install Elasticsearch
1. Install prerequisites:
bash
CopyEdit
sudo apt update
sudo apt install apt-transport-https openjdk-11-jdk wget
bash
CopyEdit
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo
apt-key add -
sudo sh -c 'echo "deb https://artifacts.elastic.co/packages/8.x/apt
stable main" > /etc/apt/sources.list.d/elastic-8.x.list'
3. Install Elasticsearch:
bash
CopyEdit
sudo apt update
sudo apt install elasticsearch
yaml
CopyEdit
network.host: 0.0.0.0
http.port: 9200
bash
CopyEdit
sudo systemctl start elasticsearch
sudo systemctl enable elasticsearch
a. Install Kibana
bash
CopyEdit
sudo apt update
sudo apt install kibana
2. Configure Kibana:
o The configuration file for Kibana is /etc/kibana/kibana.yml.
o Modify the server.host to bind Kibana to the desired interface. Example:
yaml
CopyEdit
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]
bash
CopyEdit
sudo systemctl start kibana
sudo systemctl enable kibana
4. Access Kibana:
o Kibana should now be available at http://<your_server_IP>:5601.
a. Install PostgreSQL
1. Install PostgreSQL:
bash
CopyEdit
sudo apt update
sudo apt install postgresql postgresql-contrib
bash
CopyEdit
sudo systemctl status postgresql
3. Start PostgreSQL:
bash
CopyEdit
sudo systemctl start postgresql
bash
CopyEdit
sudo systemctl enable postgresql
bash
CopyEdit
sudo -i -u postgres
bash
CopyEdit
psql
7. Create a database:
sql
CopyEdit
CREATE DATABASE my_database;
8. Create a user:
sql
CopyEdit
CREATE USER my_user WITH ENCRYPTED PASSWORD 'my_password';
9. Grant privileges:
sql
CopyEdit
GRANT ALL PRIVILEGES ON DATABASE my_database TO my_user;
sql
CopyEdit
\q
bash
CopyEdit
sudo sh -c 'echo "deb
https://ftp.postgresql.org/pub/pgadmin/pgadmin4/deb/ubuntu/ focal
pgadmin4" > /etc/apt/sources.list.d/pgadmin4.list'
bash
CopyEdit
curl https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo
apt-key add -
3. Install pgAdmin 4:
bash
CopyEdit
sudo apt update
sudo apt install pgadmin4
4. Configure pgAdmin 4:
o After installation, pgAdmin 4 is accessible via a web browser.
o To configure, you can run the following command:
bash
CopyEdit
sudo /usr/pgadmin4/bin/setup-web.sh
o This will ask for an email and password for pgAdmin's web interface.
5. Access pgAdmin 4:
o After configuration, open a browser and go to http://localhost/pgadmin4.
o Log in using the email and password you set up.
Reading a file:
python
Copy
Edit
content = file.read()
print(content)
Writing to a file:
python
Copy
Edit
python
Copy
Edit
import csv
# Reading CSV
reader = csv.reader(file)
for row in reader:
print(row)
# Writing CSV
writer = csv.writer(file)
writer.writerow(['Name', 'Age'])
writer.writerow(['Alice', 25])
python
Copy
Edit
import json
# Reading JSON
data = json.load(file)
print(data)
# Writing JSON
PythonOperator: Execute Python code within your Airflow DAG, including file handling tasks.
BashOperator: You can also use the BashOperator to execute shell commands for file
handling.
python
Copy
Edit
def read_file():
print(file.read())
def write_file():
file.write('Airflow is awesome!')
Apache NiFi is a data integration tool that provides a wide range of processors to handle file-
based operations. Some of the key processors for handling files include:
SplitText: Splits a text file into smaller chunks (by lines, for example).
PutDatabaseRecord: Writes data to a database (related to database handling).
Python provides libraries like sqlite3, pymysql, psycopg2 (for PostgreSQL), and SQLAlchemy
for interacting with databases.
python
Copy
Edit
import sqlite3
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# Create table
cursor.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT,
age INTEGER)''')
# Insert data
conn.commit()
# Reading data
rows = cursor.fetchall()
for row in rows:
print(row)
conn.close()
e. Databases in Airflow
Airflow provides operators and hooks to interact with various databases, such as MySQL,
PostgreSQL, SQLite, etc.
PythonOperator: Can be used to execute custom Python code to interact with databases.
python
Copy
Edit
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres_default',
sql='''
name VARCHAR(100),
age INTEGER
);
create_table
Apache NiFi offers processors that allow you to interact with databases, some of the most
commonly used ones are:
ExecuteSQL: Executes a SQL query on a relational database and returns the result.
Working with databases in Python and integrating them into automated workflows can be
broken down into a few main tasks:
Relational databases like MySQL, PostgreSQL, SQLite, and others are commonly accessed
through Python using connectors or ORMs (Object-Relational Mappers).
python
Copy
Edit
import sqlite3
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# Create a table
cursor.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT,
age INTEGER)''')
# Insert data
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 30))
conn.commit()
# Extract data
rows = cursor.fetchall()
print(row)
conn.close()
You can interact with PostgreSQL databases using libraries like psycopg2:
python
Copy
Edit
import psycopg2
# Connect to PostgreSQL
cursor = conn.cursor()
# Insert data
cursor.execute("INSERT INTO users (name, age) VALUES (%s, %s)", ('Bob', 25))
conn.commit()
# Extract data
rows = cursor.fetchall()
print(row)
conn.close()
SQLAlchemy is a popular ORM that abstracts raw SQL queries and allows you to work with
database objects more naturally.
python
Copy
Edit
Base = declarative_base()
# Define a model
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()
# Create tables
Base.metadata.create_all(engine)
# Insert data
session.add(new_user)
session.commit()
# Extract data
users = session.query(User).all()
print(f"{user.name}, {user.age}")
session.close()
NoSQL databases, like MongoDB, are non-relational and are accessed differently.
python
Copy
Edit
from pymongo import MongoClient
# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017")
db = client["testdb"]
collection = db["users"]
# Insert data
# Extract data
print(user)
# Close connection
client.close()
python
Copy
Edit
# Connect to Cassandra
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('testkeyspace')
# Insert data
session.execute("INSERT INTO users (id, name, age) VALUES (uuid(), 'Bob', 25)")
# Extract data
# Close connection
cluster.shutdown()
Apache Airflow is used to automate workflows and can be used to build database pipelines,
whether it's for ETL processes or scheduled database operations.
Example of a simple pipeline that extracts data from one database and loads it into another:
bash
Copy
Edit
python
Copy
Edit
def extract_data():
pg_hook = PostgresHook(postgres_conn_id='source_postgres')
return records
def load_data():
pg_hook = PostgresHook(postgres_conn_id='target_postgres')
records = extract_data()
default_args = {
'owner': 'airflow',
'retries': 1,
extract = PostgresOperator(
task_id='extract_data',
postgres_conn_id='source_postgres',
load = PostgresOperator(
task_id='load_data',
postgres_conn_id='target_postgres',
Apache NiFi is a tool for automating the flow of data between systems. To create a database
pipeline in NiFi, you use processors that interact with databases.
Extract data: Use the ExecuteSQL processor to run a query and extract data.
Load data: Use PutSQL to insert the transformed data into the destination database.
This section focuses on techniques for performing exploratory data analysis (EDA), handling
common data issues, and cleaning data in a more automated way using Airflow. Let’s break
it down:
Exploratory Data Analysis (EDA) is the first step in analyzing the data to understand its
structure, identify any patterns, and uncover relationships. Python, particularly with libraries
like pandas, matplotlib, and seaborn, makes it straightforward to perform EDA.
python
Copy
Edit
import pandas as pd
data = pd.read_csv("data.csv")
python
Copy
Edit
Visualize Data:
Histograms, box plots, or scatter plots can help visualize distributions and relationships.
python
Copy
Edit
plt.show()
plt.show()
Correlation Analysis:
Copy
Edit
corr_matrix = data.corr()
plt.show()
python
Copy
Edit
Data issues like missing values, duplicates, and incorrect data types can be handled
effectively using pandas.
python
Copy
Edit
data.dropna(inplace=True)
python
Copy
Edit
Remove Duplicates:
python
Copy
Edit
python
Copy
Edit
data['column_name'] = data['column_name'].astype('float64')
python
Copy
Edit
data['category_column'] = data['category_column'].astype('category')
Renaming Columns:
python
Copy
Edit
A DAG in Airflow represents the workflow. Here's a basic structure of an Airflow DAG for
data cleaning:
python
Copy
Edit
def clean_data():
import pandas as pd
data = pd.read_csv('data.csv')
data.to_csv('cleaned_data.csv', index=False)
Task Dependencies:
In Airflow, tasks can be chained together. For example, you can first extract data, clean it,
then load it.
python
Copy
python
Copy
Edit
def clean_data():
import logging
data = pd.read_csv('data.csv')
data.to_csv('cleaned_data.csv', index=False)
python
Copy
Edit
clean_task = PythonOperator(
task_id='clean_data',
python_callable=clean_data,
dag=dag
3. Data Ingestion
Data ingestion is the process of collecting raw data and feeding it into the pipeline.
Depending on the nature of your data, the ingestion method will vary:
4. Data Processing
After ingestion, the next step is transforming the data into a useful format. This can include:
5. Data Storage
The transformed data needs to be stored for further analysis or reporting. The storage can be
structured (e.g., relational databases) or unstructured (e.g., data lakes).
6. Data Analysis
After storing the data, the next step is to analyze it to derive insights:
7. Data Presentation
After analysis, presenting data to stakeholders in a user-friendly way is essential. This can
include dashboards, reports, or interactive visualizations.
Dashboards: Tools like Grafana, Power BI, or Tableau can be used to build
interactive visualizations.
Automated Reports: Use scripting or reporting tools to generate regular reports.
8. Orchestration
Orchestrating the data pipeline ensures smooth, scheduled, and automated execution of each
stage in the pipeline. It handles dependencies and retries for failed steps. Common tools
include:
Data pipelines require ongoing maintenance to ensure they remain efficient, scalable, and
responsive to new data sources or business needs:
Building a Kibana dashboard involves a series of steps to visualize and interact with your
data stored in Elasticsearch. Here’s a step-by-step guide on how to create one:
You need to have data indexed into Elasticsearch. This data could come from logs,
application metrics, etc.
Use tools like Logstash, Beats, or directly index data into Elasticsearch through
APIs.
In Kibana, you need an Index Pattern that corresponds to the data in Elasticsearch.
Kibana provides various visualization types such as bar charts, line graphs, pie charts, maps,
etc.
Now that you have your visualizations ready, you can create a dashboard.
1. Go to Dashboard in Kibana.
2. Click Create new Dashboard.
3. Click Add and choose the visualizations you’ve created.
4. You can resize, arrange, and configure each visualization’s display.
5. You can also use Filters and Time Ranges to customize the data shown on the
dashboard.
6. After arranging the visualizations to your preference, click Save and provide a name
for the dashboard.
You can share your dashboard by clicking on the Share button in the top right corner.
Kibana allows you to generate a URL or embed the dashboard in other platforms.
Once your dashboard is live, you can continue to monitor the data, make adjustments
to visualizations, and refresh it as needed.
Kibana dashboards are dynamic and will automatically update if the data in
Elasticsearch is updated in real-time (depending on the data refresh rate).
1. Staging Data:
o What it is: Staging data is the process of temporarily storing raw data in a
staging area before further processing or transformation. It ensures that the
data is cleaned, pre-processed, and validated before loading it into production
systems.
o How to achieve it:
Extract data from different sources (databases, APIs, files, etc.).
Store it in a staging area, typically a raw data lake, cloud storage
(like S3), or a database.
Format the data: Convert the data into a standardized format for
easier processing (e.g., CSV, Parquet, JSON).
Data Profiling: Analyze the structure, quality, and completeness of the
raw data to understand its characteristics.
2. Validating Data:
o What it is: Data validation ensures that the data in your pipeline is accurate,
consistent, and meets predefined rules or constraints.
o How to achieve it:
Schema Validation: Ensure the data matches a predefined schema
(e.g., data types, nullability, range constraints).
Data Quality Checks: Check for missing values, outliers, duplicates,
and incorrect formats.
Business Rule Validation: Ensure data satisfies business-specific
rules or logic (e.g., transaction dates must not be in the future).
Automated Testing: Implement automated data quality tests using
tools like Great Expectations or dbt to ensure ongoing validation.
Error Handling: If data doesn’t meet validation rules, trigger an alert,
reject the data, or log issues for manual review.
1. What it is: Idempotent data pipelines ensure that running the same pipeline multiple
times with the same input does not result in incorrect or inconsistent data. In other
words, the outcome of the pipeline will remain the same, no matter how many times
it's executed with the same data.
2. How to achieve it:
o Use of Unique Identifiers: Ensure that each record processed in the pipeline
has a unique identifier (ID) so that any duplicated data can be handled
appropriately (e.g., overwriting or ignoring duplicates).
o Upserts (Insert or Update): When inserting data into a database, use upsert
operations that insert new records or update existing ones based on unique
keys. This prevents duplicate records.
o Transactional Integrity: Use database transactions to ensure that operations
like inserts or updates are atomic and roll back in case of failure.
o Avoiding State Duplication: Ensure that intermediate states or outputs from
previous pipeline runs are not reused inappropriately. Use techniques like
time-based partitions or batch IDs to differentiate datasets from different
pipeline executions.
o Idempotent Functions: Make sure that the transformations or calculations
within the pipeline can be repeated without changing the final result, even
when run multiple times.
1. What it is: Atomic data pipelines refer to pipelines that ensure atomicity, meaning
each pipeline operation is executed completely or not at all. This is crucial for
maintaining the integrity of data processing. If any step of the pipeline fails, all
changes must be rolled back, and no partial data should be left behind.
2. How to achieve it:
o Transactional Integrity: Use database transactions or systems that support
atomic operations (e.g., relational databases or some NoSQL databases) to
group steps together. If one step fails, the transaction is rolled back, ensuring
no partial changes are committed.
o Error Handling and Recovery: Implement robust error handling to capture
failures at any step of the pipeline. If something goes wrong, the pipeline
should be able to either retry or rollback to the last known good state.
o Checkpoints: In long-running or complex pipelines, use checkpoints to
capture the state of the pipeline at various stages. If a failure occurs, you can
restart from the last checkpoint instead of starting over.
o Atomic File Operations: In file-based systems (like HDFS or S3), ensure that
files are fully written or updated atomically. For instance, by writing to a
temporary file and renaming it to the target file only after the process has
completed successfully.
o Minimal External Dependencies: Try to minimize the reliance on external
systems or services that may introduce failure points. If external services are
required, ensure they are idempotent and resilient.
Here's an overview of the steps to install and configure NiFi Registry, use it within NiFi,
version your data pipelines, and set up git-persistence with NiFi Registry:
bash
CopyEdit
tar -xvf nifi-registry-<version>-bin.tar.gz
cd nifi-registry-<version>
3. Configuration:
o Open the conf/nifi-registry.properties file and configure the following
essential settings:
Web HTTP Port: Set the port for accessing the NiFi Registry UI. The
default is 18080.
properties
CopyEdit
nifi.registry.web.http.port=18080
properties
CopyEdit
nifi.registry.repo.directory=/path/to/storage
bash
CopyEdit
./bin/nifi-registry.sh start
properties
CopyEdit
nifi.registry.persistence.provider=git
nifi.registry.persistence.git.repo.directory=/path/to/git/
repository
nifi.registry.persistence.git.repo.branch=main
onitoring NiFi data pipelines is crucial to ensure smooth data flow and troubleshoot any
issues that arise in real-time. Below are different ways you can monitor NiFi pipelines:
The NiFi GUI provides an intuitive and interactive way to monitor and manage your data
flows. Some key aspects of monitoring in the GUI include:
3. Processor Metrics:
o Monitor various metrics such as:
Flow File Count
Bytes In and Out
Processing Time per processor
o Click on individual processors to get more detailed statistics like success/failure
rates, the number of flow files processed, and error messages.
4. Bulletins and Alerts:
o NiFi shows bulletins at the top of the screen for any warnings or errors in your data
flow.
o You can filter the bulletins by severity to focus on the most critical issues.
5. NiFi Registry:
o If you are using NiFi Registry, you can monitor versioned flows and ensure that the
changes to data flows are properly tracked.
You can leverage NiFi processors themselves for monitoring data flows:
3. Custom Metrics:
o NiFi provides custom metrics for processors like the QueryDatabaseTable processor
or any custom processors that you might create.
o You can also use ExecuteSQL or ExecuteScript to query your NiFi instance for
custom metrics or flow data.
NiFi’s REST API provides extensive capabilities to monitor and manage NiFi
programmatically. You can use Python to automate monitoring tasks. Here's how to integrate
Python with NiFi’s REST API:
import requests
base_url = 'http://localhost:8080/nifi-api'
response = requests.get(f'{base_url}/controller/processors')
if response.status_code == 200:
processors = response.json()
for processor in processors['processors']:
print(processor['id'], processor['status'])
2. Processor Metrics:
o For example, if you want to monitor flow file counts or processing times:
processor_id = 'your-processor-id'
response =
requests.get(f'{base_url}/controller/processors/{processor_id}/status
')
status = response.json()
print(f"Processor {processor_id} status: {status['status']}")
response = requests.get(f'{base_url}/controller/bulletins')
bulletins = response.json()
for bulletin in bulletins:
print(f"Bulletin: {bulletin['message']}")
python
CopyEdit
process_group_id = 'your-process-group-id'
response = requests.get(f'{base_url}/flow/process-
groups/{process_group_id}/status')
status = response.json()
print(f"Process Group {process_group_id} status: {status['status']}")
5. Automating Alerts:
o You can create automated monitoring scripts in Python that check processor
statuses periodically and send email alerts or other notifications if certain conditions
are met (e.g., high queue size, failure rates).
response = requests.get(f'{base_url}/nifi-api/provenance')
provenance_data = response.json()
print(provenance_data)
EXP- 11. Deploying Data Pipelines
Deploying a data pipeline to production involves a series of steps to ensure that your pipeline
works reliably and efficiently at scale. This includes finalizing the pipeline design, using
configurations like the NiFi variable registry, and finally deploying the pipeline for
continuous operation.
When you're ready to move your data pipeline into production, it’s important to finalize
various elements of the pipeline for optimal performance, reliability, and monitoring. The
following tasks should be addressed:
2. Error Handling:
o Implement comprehensive error handling to capture and deal with unexpected
failures in the pipeline.
o Set up alerts and logging mechanisms to help monitor the pipeline during
production.
3. Scalability:
o Check whether your pipeline can scale horizontally (across multiple nodes) or
vertically (by adding resources to existing nodes) based on traffic.
o Ensure that resources like memory, CPU, and disk space are appropriately allocated
to handle peak loads.
4. Data Quality:
o Implement checks to monitor data quality, ensuring the integrity and accuracy of the
data being processed.
5. Documentation:
o Document the pipeline architecture, including how each component of the pipeline
functions and interacts.
o Provide clear instructions for troubleshooting and support.
6. Security Considerations:
o Implement security measures such as encryption, secure data transfers, and access
controls to protect sensitive data.
o Ensure compliance with relevant data protection regulations (e.g., GDPR).
Apache NiFi is a powerful data integration tool that simplifies data pipeline creation and
deployment. To make your NiFi-based data pipelines more adaptable and configurable, you
can use the NiFi Variable Registry.
The NiFi Variable Registry allows you to store and manage variables that can be referenced
in different parts of your NiFi flow. These variables can hold values that you want to reuse
across processors, such as API endpoints, database credentials, or other environment-specific
parameters.
2. Creating Variables:
o Click "Add Variable" to create a new variable.
o Provide a variable name and its value (e.g., API_KEY=xyz123).
o You can create variables for things like database credentials, server URLs, or API
keys.
4. Environment-specific Variables:
o You can also define environment-specific variables by creating separate variable
registries for each environment (e.g., dev, staging, production).
By using the Variable Registry, you decouple configuration from your NiFi flow, making it
easier to modify settings for different environments (such as production or development)
without modifying the actual flow design.
Once your data pipeline is ready, the final step is deployment. Deploying a data pipeline
means making it available for continuous operation. The deployment process can differ
depending on the environment and tools used (e.g., NiFi, Kubernetes, or cloud services). The
following is a general approach to deploying data pipelines:
1. Containerization (Optional):
o If using a platform like Kubernetes or Docker, package your data pipeline into
containers. This ensures portability and ease of deployment.
o Use tools like Dockerfile to containerize your NiFi or other pipeline components.
3. Configuration Management:
o Leverage configuration management tools (e.g., Ansible, Terraform, or Chef) to
deploy and manage pipeline configurations across different environments.
o Set environment-specific variables to ensure that the pipeline is properly configured
for production.
7. Post-deployment Verification:
o After deployment, verify that the pipeline is functioning correctly by checking
output, logs, and performance metrics.
o Monitor system resources and check for errors or unexpected behaviors.
uilding a production data pipeline involves several key steps to ensure data flows smoothly
from its source to its final destination, while also ensuring scalability, reliability, and proper
monitoring. Here's an overview of each step involved in creating a production-grade data
pipeline:
Before diving into building a pipeline, it's essential to have both test and production
environments, as they help ensure that your pipeline is functioning properly and can handle
the scale of production workloads.
1. Test Environment:
o Isolated: The test environment should be isolated from the production
environment to avoid disruptions.
o Mimic Production: It should replicate the production environment as closely
as possible, in terms of both data volume and structure, but with anonymized
or smaller-scale data.
o Infrastructure as Code (IaC): Use tools like Terraform, CloudFormation, or
Docker Compose to set up repeatable and version-controlled infrastructure for
the test environment.
o Automated Testing: Run unit tests, integration tests, and data validation
checks on data pipelines to ensure they function as expected before
deployment to production.
2. Production Environment:
o Scalable Infrastructure: Set up cloud-based or on-premise infrastructure to
handle production-scale data loads. Utilize elastic services (e.g., AWS, GCP,
Azure) to scale resources dynamically as demand increases.
o Data Storage: Use robust storage solutions like Amazon S3, Google Cloud
Storage, or HDFS, depending on your needs for durability and access speed.
o Monitoring & Logging: Implement comprehensive monitoring tools (e.g.,
Prometheus, Grafana, DataDog) and logging (e.g., ELK stack, Fluentd) to
track performance, errors, and issues in real-time.
o Security: Ensure data encryption, access control, and data masking, especially
if dealing with sensitive information.
1. Data Ingestion:
o Collect raw data from various sources like databases, APIs, files, or streaming
services (e.g., Kafka, AWS Kinesis, Google Pub/Sub).
o Use batch processing (e.g., Apache Spark, AWS Glue) or real-time streaming
(e.g., Apache Flink, Kafka Streams) depending on your use case.
2. Data Transformation:
o Data cleaning, enrichment, and transformation happen here. Tools like
Apache Airflow, dbt (Data Build Tool), or AWS Glue are commonly used
for orchestration and transformation.
o ETL vs ELT: Decide whether you'll perform ETL (Extract, Transform, Load)
or ELT (Extract, Load, Transform) based on the tools and infrastructure
available.
o Ensure data transformations are idempotent (i.e., rerun without causing
inconsistencies) to handle retries in case of failure.
3. Data Storage:
o Store transformed data in databases or data lakes (e.g., Amazon Redshift,
Google BigQuery, Snowflake) or NoSQL solutions (e.g., MongoDB,
Cassandra).
o Consider partitioning, indexing, and using columnar storage formats (e.g.,
Parquet, ORC) for optimized query performance.
4. Data Quality and Validation:
o Implement data validation checks to ensure data integrity, such as schema
validation and range checks.
o Tools like Great Expectations and Deequ can be used for automated data
validation.
5. Data Orchestration:
o Orchestrate tasks in the pipeline using tools like Apache Airflow, Dagster, or
Kubeflow Pipelines to schedule and automate workflows.
o Ensure the pipeline can handle failure and recovery, so tasks can retry in case
of issues.