From 4b617a1e8f876c71a2d7d58088d4cd255805d956 Mon Sep 17 00:00:00 2001 From: Debby Date: Sun, 15 Mar 2026 00:07:47 +0700 Subject: [PATCH] cron job --- dags/etl_food_security.py | 85 +++++++++++++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index 23843f6..77fd197 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -1,8 +1,57 @@ +""" +AIRFLOW DAG — ETL Food Security BigQuery +Kimball Data Warehouse Architecture + +Schedule : Setiap 3 hari sekali (timedelta(days=3)) +Catchup : False + +Kimball ETL Flow: + ┌──────────────────────────────────────────────────────────────────────────┐ + │ BRONZE (Raw) SILVER (Staging→Cleaned) GOLD (DW → Analytical) │ + │ │ + │ raw_fao ─┐ dim_country │ + │ raw_worldbank ─┼→ staging_integrated dim_indicator │ + │ raw_unicef ─┘ ↓ dim_time │ + │ cleaned_integrated ───────→ dim_source │ + │ dim_pillar │ + │ fact_food_security │ + │ ↓ │ + │ analytical_food_security │ + │ ↓ │ + │ agg_pillar_composite │ + │ agg_pillar_by_country │ + │ agg_framework_by_country │ + │ agg_framework_asean │ + │ │ + │ AUDIT : etl_logs, etl_metadata (setiap layer) │ + └──────────────────────────────────────────────────────────────────────────┘ + +Task Order: + verify_bigquery_connection + → load_fao_to_bronze + → load_worldbank_to_bronze + → load_unicef_to_bronze + → staging_integration_to_silver + → cleaned_integration_to_silver + → dimensional_model_to_gold + → analytical_layer_to_gold + → aggregation_to_gold + +Scripts folder harus berisi: + - bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...) + - bigquery_cleaned_layer.py (run_cleaned_integration) + - bigquery_dimensional_model.py (run_dimensional_model) + - bigquery_analytical_layer.py (run_analytical_layer) + - bigquery_analysis_aggregation.py (run_aggregation) + - bigquery_config.py + - bigquery_helpers.py + - bigquery_datasource.py +""" + from airflow import DAG from airflow.operators.python import PythonOperator -from datetime import datetime, timedelta +from datetime import datetime -# Import fungsi dari folder scripts from scripts.bigquery_raw_layer import ( run_verify_connection, run_load_fao, @@ -10,32 +59,36 @@ from scripts.bigquery_raw_layer import ( run_load_unicef, run_staging_integration, ) - from scripts.bigquery_cleaned_layer import ( run_cleaned_integration, ) - -from scripts.bigquery_dimesional_model import ( +from scripts.bigquery_dimensional_model import ( run_dimensional_model, ) - from scripts.bigquery_analytical_layer import ( run_analytical_layer, ) - from scripts.bigquery_aggregate_layer import ( run_aggregation, ) +# DEFAULT ARGS + +default_args = { + 'owner': 'data-engineering', + 'email': ['d1041221004@student.untan.ac.id'], +} + +# DAG DEFINITION with DAG( dag_id = "etl_food_security_bigquery", - description = "Kimball ETL: FAO, World Bank, UNICEF to BigQuery (Bronze to Silver to Gold)", + description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)", + default_args = default_args, start_date = datetime(2026, 3, 1), - schedule_interval = "@daily", - schedule_interval = timedelta(days=3), + schedule_interval = "0 0 */3 * *", catchup = False, - tags = ["food-security", "bigquery", "kimball"] + tags = ["food-security", "bigquery", "kimball"], ) as dag: task_verify = PythonOperator( @@ -62,26 +115,26 @@ with DAG( task_id = "staging_integration_to_silver", python_callable = run_staging_integration ) - + task_cleaned = PythonOperator( task_id = "cleaned_integration_to_silver", python_callable = run_cleaned_integration ) - + task_dimensional = PythonOperator( task_id = "dimensional_model_to_gold", python_callable = run_dimensional_model ) - + task_analytical = PythonOperator( task_id = "analytical_layer_to_gold", python_callable = run_analytical_layer ) - + task_aggregation = PythonOperator( task_id = "aggregation_to_gold", python_callable = run_aggregation ) - + task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation \ No newline at end of file