This commit is contained in:
Debby
2026-03-15 00:07:47 +07:00
parent 27ac14ad9b
commit 4b617a1e8f

View File

@@ -1,8 +1,57 @@
"""
AIRFLOW DAG — ETL Food Security BigQuery
Kimball Data Warehouse Architecture
Schedule : Setiap 3 hari sekali (timedelta(days=3))
Catchup : False
Kimball ETL Flow:
┌──────────────────────────────────────────────────────────────────────────┐
│ BRONZE (Raw) SILVER (Staging→Cleaned) GOLD (DW → Analytical) │
│ │
│ raw_fao ─┐ dim_country │
│ raw_worldbank ─┼→ staging_integrated dim_indicator │
│ raw_unicef ─┘ ↓ dim_time │
│ cleaned_integrated ───────→ dim_source │
│ dim_pillar │
│ fact_food_security │
│ ↓ │
│ analytical_food_security │
│ ↓ │
│ agg_pillar_composite │
│ agg_pillar_by_country │
│ agg_framework_by_country │
│ agg_framework_asean │
│ │
│ AUDIT : etl_logs, etl_metadata (setiap layer) │
└──────────────────────────────────────────────────────────────────────────┘
Task Order:
verify_bigquery_connection
→ load_fao_to_bronze
→ load_worldbank_to_bronze
→ load_unicef_to_bronze
→ staging_integration_to_silver
→ cleaned_integration_to_silver
→ dimensional_model_to_gold
→ analytical_layer_to_gold
→ aggregation_to_gold
Scripts folder harus berisi:
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
- bigquery_cleaned_layer.py (run_cleaned_integration)
- bigquery_dimensional_model.py (run_dimensional_model)
- bigquery_analytical_layer.py (run_analytical_layer)
- bigquery_analysis_aggregation.py (run_aggregation)
- bigquery_config.py
- bigquery_helpers.py
- bigquery_datasource.py
"""
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta from datetime import datetime
# Import fungsi dari folder scripts
from scripts.bigquery_raw_layer import ( from scripts.bigquery_raw_layer import (
run_verify_connection, run_verify_connection,
run_load_fao, run_load_fao,
@@ -10,32 +59,36 @@ from scripts.bigquery_raw_layer import (
run_load_unicef, run_load_unicef,
run_staging_integration, run_staging_integration,
) )
from scripts.bigquery_cleaned_layer import ( from scripts.bigquery_cleaned_layer import (
run_cleaned_integration, run_cleaned_integration,
) )
from scripts.bigquery_dimensional_model import (
from scripts.bigquery_dimesional_model import (
run_dimensional_model, run_dimensional_model,
) )
from scripts.bigquery_analytical_layer import ( from scripts.bigquery_analytical_layer import (
run_analytical_layer, run_analytical_layer,
) )
from scripts.bigquery_aggregate_layer import ( from scripts.bigquery_aggregate_layer import (
run_aggregation, run_aggregation,
) )
# DEFAULT ARGS
default_args = {
'owner': 'data-engineering',
'email': ['d1041221004@student.untan.ac.id'],
}
# DAG DEFINITION
with DAG( with DAG(
dag_id = "etl_food_security_bigquery", dag_id = "etl_food_security_bigquery",
description = "Kimball ETL: FAO, World Bank, UNICEF to BigQuery (Bronze to Silver to Gold)", description = "Kimball ETL: FAO, World Bank, UNICEF BigQuery (Bronze Silver Gold)",
default_args = default_args,
start_date = datetime(2026, 3, 1), start_date = datetime(2026, 3, 1),
schedule_interval = "@daily", schedule_interval = "0 0 */3 * *",
schedule_interval = timedelta(days=3),
catchup = False, catchup = False,
tags = ["food-security", "bigquery", "kimball"] tags = ["food-security", "bigquery", "kimball"],
) as dag: ) as dag:
task_verify = PythonOperator( task_verify = PythonOperator(
@@ -62,26 +115,26 @@ with DAG(
task_id = "staging_integration_to_silver", task_id = "staging_integration_to_silver",
python_callable = run_staging_integration python_callable = run_staging_integration
) )
task_cleaned = PythonOperator( task_cleaned = PythonOperator(
task_id = "cleaned_integration_to_silver", task_id = "cleaned_integration_to_silver",
python_callable = run_cleaned_integration python_callable = run_cleaned_integration
) )
task_dimensional = PythonOperator( task_dimensional = PythonOperator(
task_id = "dimensional_model_to_gold", task_id = "dimensional_model_to_gold",
python_callable = run_dimensional_model python_callable = run_dimensional_model
) )
task_analytical = PythonOperator( task_analytical = PythonOperator(
task_id = "analytical_layer_to_gold", task_id = "analytical_layer_to_gold",
python_callable = run_analytical_layer python_callable = run_analytical_layer
) )
task_aggregation = PythonOperator( task_aggregation = PythonOperator(
task_id = "aggregation_to_gold", task_id = "aggregation_to_gold",
python_callable = run_aggregation python_callable = run_aggregation
) )
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation