Airflow DAG — Best
Practices
DAG as configuration file
Airflow scheduler scans and compiles DAG files at each heartbeat.
If DAG files are heavy and a lot of top-level codes are present in
them, the scheduler will consume a lot of resources and time to
process them at each heartbeat. So it is advised to keep the DAGs
light, more like a configuration file. As a step forward it will be a
good choice to have a YAML/JSON-based definition of workflow
and then generating the DAG, based on that. This has a double
advantage. 1. DAGs since are getting generated programmatically
will be consistent and reproducible anytime. 2. Non-python users
will also be able to use it.
We can separate non-configuration-related code blocks outside
the DAG definition and use the template_searchpath attribute to
add those. For example, if you are trying to connect to an RDS and
execute some SQL command, that SQL command should be
loaded from a file. And the location of the file should be mentioned
in the template_searchpath. Similarly with Hive queries(.hql).
Invest in Airflow plugin system
Have a proper plugin repo and maintain it to author custom
plugins needed as per the organization’s requirement. While
creating a plugin, be generic so that it is reusable across use cases.
This helps in versioning as well as it helps in keeping workflows
clean and mostly configuration details as opposed to
implementation logic. Also don’t perform heavy work/operation
while initializing the class, push operations inside the execution
method.
Do not perform data processing in DAG files.
Since DAGs are python-based, we will definitely be tempted to use
pandas or similar stuff in DAG, but we should not. Airflow is an
orchestrator, not an execution framework. All computation should
be delegated to a specific target system.
Delegate API or DB calls to operators
This is somewhat similar to the first point. API call or DB
connection made at top-level code in DAG files overloads the
webserver. These call defined outside of operator is called every
heartbeat. So it is advisable to have these pushed down to a
util/common (can be a python operator) operator.
Make DAGs/Tasks idempotent
DAG should produce the same data on every run. Read from a
partition and write to a partition. Both should be immutable.
Handle partition creation and deletion to avoid unknown errors.
Use single variable per DAG
Every time we access DAG variables it creates a connection to
metadata DB. It may overload the Db if we are having multiple
DAGs running with multiple variables being called. It's better to
use a single variable per DAG with a JSON object. This will create
a single connection. We can parse the JSON to get the desired key-
value pair.
Tag the DAG
Having Tags in DAG helps in filtering and grouping DAGs. Make it
consistent with your infrastructure’s current tagging system. Like
tag based on BU, Project, App Category, etc.
Don’t Abuse XCom
XCom acts as a channel between tasks for sharing data. It uses
backend DB to do so. Hence we should not pass a huge amount of
data using this, as with a bigger amount of data the backend DB
will get overloaded.
Use intermediate storage between tasks.
If data to be shared between two tasks are huge store it in an
intermediate storage system. And pass the reference of it to the
downstream task.
Use the power of Jinja templating
Many of the operators support template_fields. This tuple object
defines which fields will get jinjaified.
class PythonOperator(BaseOperator):
template_fields = ('templates_dict', 'op_args', 'op_kwargs')
While writing your custom operator overrides this template_fields
attribute.
class CustomBashOperator(BaseOperator):
template_fields = ('file_name', 'command', 'dest_host')
The above example is the fields ‘file_name’, ‘command’,
‘dest_host’ will be available for jinja templating.
Implement DAG Level Access control
Leverage Flask App Builder views to have DAG level access
control. Set the DAG owner to correct Linux user. Create a custom
role to decide who can take DAG/Task actions.
Use static start_date
Static DAG start date helps in, correctly populating DAG runs and
schedule.
Rename DAGs in case of structural change
Till the time the DAG versioning feature is implemented, in case of
any structural change in DAG rename the DAG on changes. This
will create a new DAG and all DAG history of the previous run for
the old DAG version will be there without any inconsistency.
Some other best practices:
Set retries at the DAG level
Use consistent file structure
Choose a consistent method for task dependencies
Have notification strategy on failure
Keep an eye on the upcoming enhancements to airflow:
Functional DAG
DAG Serialization
Scheduler HA
Production grade REST APIs
Smart Sensors
Task Groups
Have fun with DAGs