End-to-end ML pipeline that ingests Argentina's unconventional oil & gas well production data, engineers features, stores them in a versioned MySQL feature store, trains Random Forest models tracked with MLflow, and promotes the best model to production.
Data Source (API) -> Airflow (Ingest DAG) -> MySQL Feature Store -> Airflow (Train DAG) -> MLflow Model Registry -> Forecast API
| Service | Port | Description |
|---|---|---|
| Airflow UI | 8080 | DAG orchestration and monitoring |
| MLflow UI | 9191 | Experiment tracking and model registry |
| Forecast API | 8000 | REST API for wells and production forecasts |
| MySQL | 3307 | Versioned feature store |
| PostgreSQL | 5432 | Airflow metadata database |
| Redis | 6379 | Celery broker |
Ingestion pipeline that runs the following tasks sequentially:
- download_dataset - Downloads the raw CSV from Argentina's energy open data portal
- preprocess - Cleans data, encodes categorical variables, and engineers features using a 10-reading sliding window per well (rolling averages, last values)
- insert_into_online_store - Versions the feature store (
v1_feature_store,v2_feature_store, ...) and writes processed data to MySQL - delete_raw_data - Cleans up temporary CSV files
- trigger_train_dag - Triggers the training DAG via
TriggerDagRunOperator
Training and model promotion pipeline:
- train_with_online_feature_store - Reads the latest feature store version from MySQL, trains 3 Random Forest regressors (
n_estimators= 25, 50, 100) to predictprod_pet(oil production), and logs parameters, metrics (MAE, MSE, R2), and model artifacts to MLflow - promote_best_model - Selects the best model by lowest MSE, registers it in the MLflow model registry, and tags it with the
productionalias
The feature store is versioned in MySQL. Each ingestion creates a new table (v1_feature_store, v2_feature_store, etc.) and updates the fs_metadata table with the latest version.
| Feature | Description |
|---|---|
avg_prod_gas_10m |
Average gas production over last 10 readings |
avg_prod_pet_10m |
Average oil production over last 10 readings |
last_prod_gas |
Most recent gas production reading |
last_prod_pet |
Most recent oil production reading |
n_readings |
Number of readings in the window |
tipoextraccion |
Extraction type (label encoded) |
- Docker and Docker Compose
docker compose up -dConfiguration is managed via the .env file:
| Variable | Description |
|---|---|
AIRFLOW_UID |
User ID for Airflow containers |
MYSQL_USER |
MySQL username |
MYSQL_PASSWORD |
MySQL password |
MYSQL_HOST |
MySQL host |
MYSQL_PORT |
MySQL port |
MYSQL_DATABASE |
MySQL database name |
docker compose up -dWait a couple of minutes for all services to initialize. You can check the status with:
docker compose psAll services should show as healthy before proceeding.
Open http://localhost:8080 and log in with:
- Username:
airflow - Password:
airflow
- In the Airflow UI, find the
ingest_dagDAG - Unpause it by toggling the switch on the left
- Click the Play button to trigger a manual run
- Monitor progress in the Graph view — each task will turn green as it completes
This will:
- Download the raw dataset (~1-2 min depending on network)
- Preprocess and engineer features
- Create a versioned table in MySQL (e.g.,
v1_feature_store) - Clean up temporary files
- Automatically trigger the training DAG
The train_with_online_feature_store_and_promote_best_model DAG is triggered automatically after ingestion. It will:
- Read the latest feature store version from MySQL
- Train 3 Random Forest models with different
n_estimators(25, 50, 100) - Log all metrics and artifacts to MLflow
- Promote the best model (lowest MSE) to the MLflow model registry with the
productionalias
Open http://localhost:9191 to:
- Compare model runs under the
energy_experimentexperiment - View metrics (MAE, MSE, R2) for each model
- Check the Models tab to see the registered production model
To use the promoted model in your application:
import mlflow
model = mlflow.sklearn.load_model("models:/rf_prod_pet@production")
predictions = model.predict(X_new)To inspect which model is currently in production and trace it back to its training run:
from mlflow.tracking import MlflowClient
client = MlflowClient()
mv = client.get_model_version_by_alias("rf_prod_pet", "production")
run = client.get_run(mv.run_id)
print("Model version :", mv.version)
print("Run ID :", mv.run_id)
print("Feature table :", run.data.params["feature_store_table"])
print("n_estimators :", run.data.params["n_estimators"])
print("MSE :", run.data.metrics["mse"])
print("R2 :", run.data.metrics["r2"])This allows you to reproduce any past model training by pairing the run ID with the exact feature store version used.
The API is available at http://localhost:8000 once the stack is running. Interactive docs (Swagger UI) are at http://localhost:8000/docs.
Note: The API requires the ingest and training pipelines to have run at least once before serving forecasts.
| Method | Path | Description |
|---|---|---|
| GET | /health |
Service health check |
| GET | /api/v1/wells |
List of active wells as of a given date |
| GET | /api/v1/forecast |
Production forecast for a well over a date range |
| GET | /api/v1/model |
Current production model metadata and training provenance |
Returns distinct wells that have production records on or before date_query.
curl "http://localhost:8000/api/v1/wells?date_query=2024-01-01"[
{"id_well": "30000000003011"},
{"id_well": "30000000003012"}
]Returns the predicted monthly oil production (prod_pet) for a well between date_start and date_end. Features are sourced from the well's online store record (the pre-computed row for the next inference period).
curl "http://localhost:8000/api/v1/forecast?id_well=30000000003011&date_start=2024-01-01&date_end=2024-06-01"{
"id_well": "30000000003011",
"data": [
{"date": "2024-01-01", "prod": 142.7},
{"date": "2024-02-01", "prod": 142.7},
{"date": "2024-03-01", "prod": 142.7}
]
}Returns metadata of the current production model for traceability: which feature store version was used for training, the MLflow run ID, hyperparameters, and evaluation metrics.
curl "http://localhost:8000/api/v1/model"{
"model_name": "rf_prod_pet",
"model_version": "3",
"alias": "production",
"run_id": "a1b2c3d4e5f6...",
"feature_store_version": "v2",
"feature_store_table": "v2_feature_store",
"target": "prod_pet",
"n_estimators": 100,
"metrics": {
"mae": 12.4,
"mse": 430.1,
"r2": 0.87
}
}Each time you trigger ingest_dag, a new feature store version is created (v1, v2, v3, ...) and new models are trained against the latest data. The best model is re-evaluated and the production alias is updated if a better model is found.
Unconventional Oil & Gas Well Production - Argentina Energy Open Data
- Flores Jorge Federico - jfflores90@gmail.com
- Nicolas Velazquez - nicoj.velazquez@gmail.com