Files
airflow-coolify/dags/etl_food_security.py
2026-03-14 23:04:11 +07:00

59 lines
1.6 KiB
Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# Import fungsi dari folder scripts
from scripts.bigquery_raw_layer import (
run_verify_connection,
run_load_fao,
run_load_worldbank,
run_load_unicef,
run_staging_integration,
)
from scripts.bigquery_cleaned_layer import (
run_cleaned_integration,
)
with DAG(
dag_id = "etl_food_security_bigquery",
start_date = datetime(2026, 3, 1),
schedule_interval = "@daily",
catchup = False,
tags = ["food-security", "bigquery", "kimball"]
) as dag:
task_verify = PythonOperator(
task_id = "verify_bigquery_connection",
python_callable = run_verify_connection
)
task_fao = PythonOperator(
task_id = "load_fao_to_bronze",
python_callable = run_load_fao
)
task_worldbank = PythonOperator(
task_id = "load_worldbank_to_bronze",
python_callable = run_load_worldbank
)
task_unicef = PythonOperator(
task_id = "load_unicef_to_bronze",
python_callable = run_load_unicef
)
task_staging = PythonOperator(
task_id = "staging_integration_to_silver",
python_callable = run_staging_integration
)
task_cleaned = PythonOperator(
task_id = "cleaned_integration_to_silver",
python_callable = run_cleaned_integration
)
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned