To repozytorium jest praktycznym szablonem szkoleniowym dla Airflow 3.2.1. Pokazuje dwa równoległe modele pracy:
- lokalne testy jednostkowe bez Airflow dla logiki biznesowej,
- uruchamianie DAG-ów Airflow z rozdzieleniem runtime Airflow i runtime logiki biznesowej.
Pipeline szkoleniowy nazywa się iot_ingress i realizuje wariant EL(t):
- Extract + Load — odczyt z bazy źródłowej i ładowanie surowych danych do tabeli stagingowej w bazie docelowej.
- Plan transformacji — czysta logika Pythonowa z zależnością
pydantic, uruchamiana w izolowanym runtime. - Transform in DB — transformacja SQL po stronie bazy docelowej i zapis do tabeli wynikowej.
.
├── dags/
│ └── iot_ingress_dag.py
├── dag_lib/
│ ├── shared/
│ │ └── repo_paths.py
│ ├── airflow_runtime/
│ │ ├── dbapi.py
│ │ └── iot_ingress_db.py
│ └── business_runtime/
│ └── iot_ingress/
│ └── transform_plan.py
├── tests/
│ ├── fixtures/
│ └── unit/
├── requirements/
├── scripts/
├── compose.yaml
├── pyproject.toml
└── pytest.ini
dag_lib/ to wspólny kod Pythona używany przez DAG-i, testy lokalne i izolowane runtime tasków.
dag_lib/airflow_runtime/zawiera kod, który może importowaćairflow.sdki korzystać z connectionów oraz sterowników DB.dag_lib/business_runtime/zawiera kod, który nie może importowaćairflow.*. To jest kod testowany lokalnie przezpytesti uruchamiany wvirtualenvlubexternal_python.dags/zawiera tylko cienki plik DAG-a, który składa taski i harmonogramy.
Taki układ ułatwia jednocześnie:
- pracę w IDE bez ładowania DAG-ów do klastra,
- rozdział zależności między runtime Airflow i business runtime,
- czytelne szkolenie zespołu.
W dags/iot_ingress_dag.py zdefiniowane są dwa DAG-i:
- zawsze używa
@task.virtualenv, - służy do codziennego developmentu i szybkiej walidacji zmian,
- nie zależy od globalnej konfiguracji instancji Airflow.
- działa jak DAG „produkcyjny”,
- korzysta z
IOT_INGRESS_RUNTIME_MODE=virtualenv|external, - pozwala w środowisku developerskim przetestować dokładnie ten wariant runtime, który chcesz potem użyć na produkcji.
Dzięki temu zwykły loop developerski jest stabilny (iot_ingress_dev), a walidacja przedwdrożeniowa może sprawdzać również model 3 (iot_ingress).
IOT_INGRESS_RUNTIME_MODE=virtualenvTask tworzy lub odtwarza venv na podstawie requirements i wheelhouse. Po zbudowaniu cache kolejne uruchomienia powinny być szybsze. Ten tryb jest wymuszony przez iot_ingress_dev i może też być użyty przez iot_ingress.
IOT_INGRESS_RUNTIME_MODE=external
IOT_INGRESS_EXTERNAL_PYTHON=/opt/airflow/task-venvs/iot_ingress/prod/bin/pythonTask korzysta z wcześniej przygotowanego interpretera. Venv przygotowuje skrypt scripts/prepare_external_venv.sh. To jest wariant do walidacji przedwdrożeniowej i dla środowisk, w których chcesz kontrolować lifecycle zależności poza samym wykonaniem taska.
scripts/create_local_venv.sh python3.11
scripts/download_iot_wheelhouse.sh
scripts/install_local_dev_requirements.sh
. .venv/bin/activate
pytestTo uruchamia wyłącznie testy business runtime i helperów repo. Nie potrzebujesz schedulera, bazy metadanych ani Airflow SDK.
-
scripts/create_local_venv.sh <python-command>- tworzy lokalny katalog
.venv, - wymaga jawnego wskazania interpretera, np.
python3.11, - jest przydatny szczególnie w VSCode i w pracy czysto terminalowej.
- tworzy lokalny katalog
-
scripts/install_local_dev_requirements.sh- instaluje runtime requirements i dev requirements z lokalnego wheelhouse,
- używa trybu offline-friendly (
--no-index,--find-links), - finalnie wykonuje editable install repo, aby importy
dag_libdziałały poprawnie.
Wheelhouse lokalny:
./pip-offline-modules_linux-x86_64-cp311
Jeżeli wheelhouse nie istnieje albo jest niepełny, przygotuj go najpierw przez:
scripts/download_iot_wheelhouse.sh- Utwórz i aktywuj lokalne venv:
scripts/create_local_venv.sh python3.11 scripts/download_iot_wheelhouse.sh scripts/install_local_dev_requirements.sh . .venv/bin/activate - W PyCharm wybierz interpreter
./.venv/bin/python. - Ustaw test runner na
pytest. - Otwieraj i edytuj:
dag_lib/business_runtime/...dla logiki biznesowej,dag_lib/airflow_runtime/...dla logiki zależnej od Airflow/DB,dags/iot_ingress_dag.pydla harmonogramu i zależności tasków.
- Uruchamiaj lokalnie
tests/unit/...przed każdą próbą w Compose.
Po migracji nie trzeba oznaczać dags/ jako source root dla lokalnych testów jednostkowych.
- Zainstaluj rozszerzenie Python.
- Przygotuj lokalne środowisko z CLI:
scripts/create_local_venv.sh python3.11 scripts/download_iot_wheelhouse.sh scripts/install_local_dev_requirements.sh
- Wybierz interpreter
./.venv/bin/python. - Dodaj ustawienia robocze, np. w
.vscode/settings.json:{ "python.testing.pytestEnabled": true, "python.testing.unittestEnabled": false, "python.testing.pytestArgs": ["tests"], "python.analysis.extraPaths": ["."] } - Używaj terminala w repo root i lokalnego venv do uruchamiania
pytestorazruff.
To jest główna przewaga nowego workflow dla VSCode: przygotowanie środowiska nie zależy od funkcji IDE, tylko od jawnych skryptów repo.
Najprostszy workflow repo-native:
- Przygotuj bind mounty,
.envi podstawowe preflight checks:scripts/prepare_for_docker.sh
- Jeżeli skrypt zgłosi brak wheelhouse dla tasków
virtualenv, przygotuj go:scripts/download_iot_wheelhouse.sh
- Uruchom init i podstawowe usługi Airflow:
scripts/start_local_airflow.sh
- Otwórz UI:
Login:
http://localhost:8080airflow
Hasło:airflow - Potwierdź auth przez API:
Oczekiwany wynik: odpowiedź JSON z polem
curl -sS -X POST http://localhost:8080/auth/token \ -H 'Content-Type: application/json' \ -d '{"username":"airflow","password":"airflow"}'
access_tokeni kod HTTP201.
Compose uruchamia:
- lokalny Airflow 3.2.1,
- bazę metadanych Airflow,
- bazę źródłową
iot_source_db, - bazę docelową
iot_target_db.
Skrypt:
- kopiuje
.env.exampledo.env, jeśli.envnie istnieje, - tworzy katalogi
logs,plugins,config,.local/task-venvs,.local/venv-cache, - ustawia prawa zapisu dla
logs,.local/task-venvs,.local/venv-cache, - sprawdza dostępność
docker,docker composei działający daemon, - sprawdza obecność
config/simple_auth_manager_passwords.json, - sprawdza, czy wheelhouse
pip-offline-modules_linux-x86_64-cp311istnieje.
Permissions są ustawiane celowo, bo kontenery Airflow pracują jako UID 50000, a bind mounty tworzone przez użytkownika hosta powodowały już realne błędy:
- brak zapisu do
/opt/airflow/logs/..., - brak zapisu do
/opt/airflow/venv-cache/....
Lokalny stack używa SimpleAuthManager, a konto airflow/airflow jest bootstrapowane przez konfigurację:
AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS=airflow:adminconfig/simple_auth_manager_passwords.jsonairflow-init, który kopiuje seed haseł do współdzielonego wolumenu runtime/opt/airflow/auth
Jeżeli po starcie UI/API dostajesz 401, sprawdź kolejno:
- czy
docker compose up airflow-initzakończył się sukcesem, - czy plik
config/simple_auth_manager_passwords.jsonnadal zawiera wpis{"airflow":"airflow"}, - czy
docker compose up -duruchomiłairflow-api-server, - czy request do
/auth/tokenjest wysyłany z loginemairflowi hasłemairflow.
Po zmianie kodu DAG-a lub helperów używanych przy parsowaniu uruchom:
scripts/reload_airflow_dags.shSkrypt restartuje:
airflow-dag-processorairflow-schedulerairflow-api-server
To jest preferowany „soft reload” przed kolejnym runem DAG-a. Po restarcie odśwież UI i uruchom nowy run.
Uczestnik identyfikuje:
- cienki plik DAG-a,
dag_lib/airflow_runtime/,dag_lib/business_runtime/,- requirements i wheelhouse,
- testy fixture-based.
Uruchom:
pytestNastępnie zmień transform_plan.py i popraw test, jeśli zmieniasz kontrakt wyniku.
Uruchom lokalny stack Airflow i odpal DAG iot_ingress_dev.
To jest podstawowy DAG developerski:
- zawsze korzysta z
virtualenv, - nie zależy od globalnego przełącznika runtime.
Upewnij się, że w .env jest:
IOT_INGRESS_RUNTIME_MODE=virtualenvJeżeli zmieniłeś .env, odtwórz komponenty parsujące DAG-i:
docker compose up -d --force-recreate airflow-scheduler airflow-dag-processor airflow-api-serverUruchom iot_ingress, aby sprawdzić, czy „produkcyjny” DAG działa jeszcze w modelu 2.
Uruchom:
docker compose run --rm airflow-cli bash scripts/prepare_external_venv.shSkrypt buduje external venv pod stałą ścieżką:
/opt/airflow/task-venvs/iot_ingress/prod/bin/python
Używaj go tylko wtedy, gdy external runtime nie jest równolegle używany przez task.
Ustaw w .env:
IOT_INGRESS_RUNTIME_MODE=external
IOT_INGRESS_EXTERNAL_PYTHON=/opt/airflow/task-venvs/iot_ingress/prod/bin/pythonPrzeładuj komponenty Airflow odpowiedzialne za parsowanie/serializację DAG-ów:
docker compose up -d --force-recreate airflow-scheduler airflow-dag-processor airflow-api-serverUruchom iot_ingress i potwierdź, że pipeline działa w external runtime.
Przed synchronizacją zmian na branch produkcyjny wykonaj co najmniej:
- lokalne
pytest, iot_ingress_dev,iot_ingressw tym modelu runtime, który ma być użyty docelowo.
- Local unit tests — gdy zmieniasz czystą logikę business runtime.
iot_ingress_dev— gdy iterujesz na zmianach i chcesz stabilny model 2.iot_ingress— gdy chcesz sprawdzić zachowanie DAG-a w modelu runtime skonfigurowanym dla instancji.
- DAG nie wykonuje IO na poziomie importu.
dag_lib/business_runtime/nie importujeairflow.*.- Połączenia i sekrety są odczytywane wyłącznie w
dag_lib/airflow_runtime/. - Duże dane nie przechodzą przez XCom.
- SQL transformujący działa w target DB.
- Requirements business runtime są jawne i wersjonowane.
- Wheelhouse jest częścią procesu release albo artefaktu deploymentowego.