Skip to content

Girgitt/airflow-dag-lab

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Airflow 3.2.1 training repo — testowalne DAG-i, dag_lib i dwa tryby DAG-a

To repozytorium jest praktycznym szablonem szkoleniowym dla Airflow 3.2.1. Pokazuje dwa równoległe modele pracy:

  1. lokalne testy jednostkowe bez Airflow dla logiki biznesowej,
  2. uruchamianie DAG-ów Airflow z rozdzieleniem runtime Airflow i runtime logiki biznesowej.

Pipeline szkoleniowy nazywa się iot_ingress i realizuje wariant EL(t):

  1. Extract + Load — odczyt z bazy źródłowej i ładowanie surowych danych do tabeli stagingowej w bazie docelowej.
  2. Plan transformacji — czysta logika Pythonowa z zależnością pydantic, uruchamiana w izolowanym runtime.
  3. Transform in DB — transformacja SQL po stronie bazy docelowej i zapis do tabeli wynikowej.

Docelowy układ repo

.
├── dags/
│   └── iot_ingress_dag.py
├── dag_lib/
│   ├── shared/
│   │   └── repo_paths.py
│   ├── airflow_runtime/
│   │   ├── dbapi.py
│   │   └── iot_ingress_db.py
│   └── business_runtime/
│       └── iot_ingress/
│           └── transform_plan.py
├── tests/
│   ├── fixtures/
│   └── unit/
├── requirements/
├── scripts/
├── compose.yaml
├── pyproject.toml
└── pytest.ini

Dlaczego dag_lib/

dag_lib/ to wspólny kod Pythona używany przez DAG-i, testy lokalne i izolowane runtime tasków.

  • dag_lib/airflow_runtime/ zawiera kod, który może importować airflow.sdk i korzystać z connectionów oraz sterowników DB.
  • dag_lib/business_runtime/ zawiera kod, który nie może importować airflow.*. To jest kod testowany lokalnie przez pytest i uruchamiany w virtualenv lub external_python.
  • dags/ zawiera tylko cienki plik DAG-a, który składa taski i harmonogramy.

Taki układ ułatwia jednocześnie:

  • pracę w IDE bez ładowania DAG-ów do klastra,
  • rozdział zależności między runtime Airflow i business runtime,
  • czytelne szkolenie zespołu.

Dwa DAG-i w jednym pliku

W dags/iot_ingress_dag.py zdefiniowane są dwa DAG-i:

iot_ingress_dev

  • zawsze używa @task.virtualenv,
  • służy do codziennego developmentu i szybkiej walidacji zmian,
  • nie zależy od globalnej konfiguracji instancji Airflow.

iot_ingress

  • działa jak DAG „produkcyjny”,
  • korzysta z IOT_INGRESS_RUNTIME_MODE=virtualenv|external,
  • pozwala w środowisku developerskim przetestować dokładnie ten wariant runtime, który chcesz potem użyć na produkcji.

Dzięki temu zwykły loop developerski jest stabilny (iot_ingress_dev), a walidacja przedwdrożeniowa może sprawdzać również model 3 (iot_ingress).

Modele runtime logiki biznesowej

Model 2 — virtualenv

IOT_INGRESS_RUNTIME_MODE=virtualenv

Task tworzy lub odtwarza venv na podstawie requirements i wheelhouse. Po zbudowaniu cache kolejne uruchomienia powinny być szybsze. Ten tryb jest wymuszony przez iot_ingress_dev i może też być użyty przez iot_ingress.

Model 3 — external_python

IOT_INGRESS_RUNTIME_MODE=external
IOT_INGRESS_EXTERNAL_PYTHON=/opt/airflow/task-venvs/iot_ingress/prod/bin/python

Task korzysta z wcześniej przygotowanego interpretera. Venv przygotowuje skrypt scripts/prepare_external_venv.sh. To jest wariant do walidacji przedwdrożeniowej i dla środowisk, w których chcesz kontrolować lifecycle zależności poza samym wykonaniem taska.

Szybki start lokalny: CLI-first dev workflow bez Airflow

scripts/create_local_venv.sh python3.11
scripts/download_iot_wheelhouse.sh
scripts/install_local_dev_requirements.sh
. .venv/bin/activate
pytest

To uruchamia wyłącznie testy business runtime i helperów repo. Nie potrzebujesz schedulera, bazy metadanych ani Airflow SDK.

Co robią nowe skrypty

  • scripts/create_local_venv.sh <python-command>

    • tworzy lokalny katalog .venv,
    • wymaga jawnego wskazania interpretera, np. python3.11,
    • jest przydatny szczególnie w VSCode i w pracy czysto terminalowej.
  • scripts/install_local_dev_requirements.sh

    • instaluje runtime requirements i dev requirements z lokalnego wheelhouse,
    • używa trybu offline-friendly (--no-index, --find-links),
    • finalnie wykonuje editable install repo, aby importy dag_lib działały poprawnie.

Wheelhouse lokalny:

./pip-offline-modules_linux-x86_64-cp311

Jeżeli wheelhouse nie istnieje albo jest niepełny, przygotuj go najpierw przez:

scripts/download_iot_wheelhouse.sh

Praca w PyCharm

  1. Utwórz i aktywuj lokalne venv:
    scripts/create_local_venv.sh python3.11
    scripts/download_iot_wheelhouse.sh
    scripts/install_local_dev_requirements.sh
    . .venv/bin/activate
  2. W PyCharm wybierz interpreter ./.venv/bin/python.
  3. Ustaw test runner na pytest.
  4. Otwieraj i edytuj:
    • dag_lib/business_runtime/... dla logiki biznesowej,
    • dag_lib/airflow_runtime/... dla logiki zależnej od Airflow/DB,
    • dags/iot_ingress_dag.py dla harmonogramu i zależności tasków.
  5. Uruchamiaj lokalnie tests/unit/... przed każdą próbą w Compose.

Po migracji nie trzeba oznaczać dags/ jako source root dla lokalnych testów jednostkowych.

Praca w VSCode

  1. Zainstaluj rozszerzenie Python.
  2. Przygotuj lokalne środowisko z CLI:
    scripts/create_local_venv.sh python3.11
    scripts/download_iot_wheelhouse.sh
    scripts/install_local_dev_requirements.sh
  3. Wybierz interpreter ./.venv/bin/python.
  4. Dodaj ustawienia robocze, np. w .vscode/settings.json:
    {
      "python.testing.pytestEnabled": true,
      "python.testing.unittestEnabled": false,
      "python.testing.pytestArgs": ["tests"],
      "python.analysis.extraPaths": ["."]
    }
  5. Używaj terminala w repo root i lokalnego venv do uruchamiania pytest oraz ruff.

To jest główna przewaga nowego workflow dla VSCode: przygotowanie środowiska nie zależy od funkcji IDE, tylko od jawnych skryptów repo.

Przygotowanie lokalnego środowiska Airflow przez compose.yaml

Najprostszy workflow repo-native:

  1. Przygotuj bind mounty, .env i podstawowe preflight checks:
    scripts/prepare_for_docker.sh
  2. Jeżeli skrypt zgłosi brak wheelhouse dla tasków virtualenv, przygotuj go:
    scripts/download_iot_wheelhouse.sh
  3. Uruchom init i podstawowe usługi Airflow:
    scripts/start_local_airflow.sh
  4. Otwórz UI:
    http://localhost:8080
    
    Login: airflow
    Hasło: airflow
  5. Potwierdź auth przez API:
    curl -sS -X POST http://localhost:8080/auth/token \
      -H 'Content-Type: application/json' \
      -d '{"username":"airflow","password":"airflow"}'
    Oczekiwany wynik: odpowiedź JSON z polem access_token i kod HTTP 201.

Compose uruchamia:

  • lokalny Airflow 3.2.1,
  • bazę metadanych Airflow,
  • bazę źródłową iot_source_db,
  • bazę docelową iot_target_db.

Co robi scripts/prepare_for_docker.sh

Skrypt:

  • kopiuje .env.example do .env, jeśli .env nie istnieje,
  • tworzy katalogi logs, plugins, config, .local/task-venvs, .local/venv-cache,
  • ustawia prawa zapisu dla logs, .local/task-venvs, .local/venv-cache,
  • sprawdza dostępność docker, docker compose i działający daemon,
  • sprawdza obecność config/simple_auth_manager_passwords.json,
  • sprawdza, czy wheelhouse pip-offline-modules_linux-x86_64-cp311 istnieje.

Permissions są ustawiane celowo, bo kontenery Airflow pracują jako UID 50000, a bind mounty tworzone przez użytkownika hosta powodowały już realne błędy:

  • brak zapisu do /opt/airflow/logs/...,
  • brak zapisu do /opt/airflow/venv-cache/....

Lokalny stack używa SimpleAuthManager, a konto airflow/airflow jest bootstrapowane przez konfigurację:

  • AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS=airflow:admin
  • config/simple_auth_manager_passwords.json
  • airflow-init, który kopiuje seed haseł do współdzielonego wolumenu runtime /opt/airflow/auth

Jeżeli po starcie UI/API dostajesz 401, sprawdź kolejno:

  1. czy docker compose up airflow-init zakończył się sukcesem,
  2. czy plik config/simple_auth_manager_passwords.json nadal zawiera wpis {"airflow":"airflow"},
  3. czy docker compose up -d uruchomił airflow-api-server,
  4. czy request do /auth/token jest wysyłany z loginem airflow i hasłem airflow.

Miękki reload DAG-ów po zmianie kodu

Po zmianie kodu DAG-a lub helperów używanych przy parsowaniu uruchom:

scripts/reload_airflow_dags.sh

Skrypt restartuje:

  • airflow-dag-processor
  • airflow-scheduler
  • airflow-api-server

To jest preferowany „soft reload” przed kolejnym runem DAG-a. Po restarcie odśwież UI i uruchom nowy run.

Jak przejść szkolenie krok po kroku

Etap 1 — poznanie struktury

Uczestnik identyfikuje:

  • cienki plik DAG-a,
  • dag_lib/airflow_runtime/,
  • dag_lib/business_runtime/,
  • requirements i wheelhouse,
  • testy fixture-based.

Etap 2 — lokalne testy jednostkowe

Uruchom:

pytest

Następnie zmień transform_plan.py i popraw test, jeśli zmieniasz kontrakt wyniku.

Etap 3 — codzienny development przez iot_ingress_dev

Uruchom lokalny stack Airflow i odpal DAG iot_ingress_dev.

To jest podstawowy DAG developerski:

  • zawsze korzysta z virtualenv,
  • nie zależy od globalnego przełącznika runtime.

Etap 4 — walidacja DAG-a iot_ingress w trybie virtualenv

Upewnij się, że w .env jest:

IOT_INGRESS_RUNTIME_MODE=virtualenv

Jeżeli zmieniłeś .env, odtwórz komponenty parsujące DAG-i:

docker compose up -d --force-recreate airflow-scheduler airflow-dag-processor airflow-api-server

Uruchom iot_ingress, aby sprawdzić, czy „produkcyjny” DAG działa jeszcze w modelu 2.

Etap 5 — przygotowanie external runtime

Uruchom:

docker compose run --rm airflow-cli bash scripts/prepare_external_venv.sh

Skrypt buduje external venv pod stałą ścieżką:

/opt/airflow/task-venvs/iot_ingress/prod/bin/python

Używaj go tylko wtedy, gdy external runtime nie jest równolegle używany przez task.

Etap 6 — walidacja DAG-a iot_ingress w trybie external

Ustaw w .env:

IOT_INGRESS_RUNTIME_MODE=external
IOT_INGRESS_EXTERNAL_PYTHON=/opt/airflow/task-venvs/iot_ingress/prod/bin/python

Przeładuj komponenty Airflow odpowiedzialne za parsowanie/serializację DAG-ów:

docker compose up -d --force-recreate airflow-scheduler airflow-dag-processor airflow-api-server

Uruchom iot_ingress i potwierdź, że pipeline działa w external runtime.

Etap 7 — decyzja przed synchronizacją na produkcję

Przed synchronizacją zmian na branch produkcyjny wykonaj co najmniej:

  • lokalne pytest,
  • iot_ingress_dev,
  • iot_ingress w tym modelu runtime, który ma być użyty docelowo.

Kiedy używać czego

  • Local unit tests — gdy zmieniasz czystą logikę business runtime.
  • iot_ingress_dev — gdy iterujesz na zmianach i chcesz stabilny model 2.
  • iot_ingress — gdy chcesz sprawdzić zachowanie DAG-a w modelu runtime skonfigurowanym dla instancji.

Najważniejsze reguły projektowe

  1. DAG nie wykonuje IO na poziomie importu.
  2. dag_lib/business_runtime/ nie importuje airflow.*.
  3. Połączenia i sekrety są odczytywane wyłącznie w dag_lib/airflow_runtime/.
  4. Duże dane nie przechodzą przez XCom.
  5. SQL transformujący działa w target DB.
  6. Requirements business runtime są jawne i wersjonowane.
  7. Wheelhouse jest częścią procesu release albo artefaktu deploymentowego.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors