87 lines
2.4 KiB
Python
87 lines
2.4 KiB
Python
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from datetime import datetime, timedelta
|
|
|
|
# Import fungsi dari folder scripts
|
|
from scripts.bigquery_raw_layer import (
|
|
run_verify_connection,
|
|
run_load_fao,
|
|
run_load_worldbank,
|
|
run_load_unicef,
|
|
run_staging_integration,
|
|
)
|
|
|
|
from scripts.bigquery_cleaned_layer import (
|
|
run_cleaned_integration,
|
|
)
|
|
|
|
from scripts.bigquery_dimesional_model import (
|
|
run_dimensional_model,
|
|
)
|
|
|
|
from scripts.bigquery_analytical_layer import (
|
|
run_analytical_layer,
|
|
)
|
|
|
|
from scripts.bigquery_aggregate_layer import (
|
|
run_aggregation,
|
|
)
|
|
|
|
|
|
with DAG(
|
|
dag_id = "etl_food_security_bigquery",
|
|
description = "Kimball ETL: FAO, World Bank, UNICEF to BigQuery (Bronze to Silver to Gold)",
|
|
start_date = datetime(2026, 3, 1),
|
|
schedule_interval = "@daily",
|
|
schedule_interval = timedelta(days=3),
|
|
catchup = False,
|
|
tags = ["food-security", "bigquery", "kimball"]
|
|
) as dag:
|
|
|
|
task_verify = PythonOperator(
|
|
task_id = "verify_bigquery_connection",
|
|
python_callable = run_verify_connection
|
|
)
|
|
|
|
task_fao = PythonOperator(
|
|
task_id = "load_fao_to_bronze",
|
|
python_callable = run_load_fao
|
|
)
|
|
|
|
task_worldbank = PythonOperator(
|
|
task_id = "load_worldbank_to_bronze",
|
|
python_callable = run_load_worldbank
|
|
)
|
|
|
|
task_unicef = PythonOperator(
|
|
task_id = "load_unicef_to_bronze",
|
|
python_callable = run_load_unicef
|
|
)
|
|
|
|
task_staging = PythonOperator(
|
|
task_id = "staging_integration_to_silver",
|
|
python_callable = run_staging_integration
|
|
)
|
|
|
|
task_cleaned = PythonOperator(
|
|
task_id = "cleaned_integration_to_silver",
|
|
python_callable = run_cleaned_integration
|
|
)
|
|
|
|
task_dimensional = PythonOperator(
|
|
task_id = "dimensional_model_to_gold",
|
|
python_callable = run_dimensional_model
|
|
)
|
|
|
|
task_analytical = PythonOperator(
|
|
task_id = "analytical_layer_to_gold",
|
|
python_callable = run_analytical_layer
|
|
)
|
|
|
|
task_aggregation = PythonOperator(
|
|
task_id = "aggregation_to_gold",
|
|
python_callable = run_aggregation
|
|
)
|
|
|
|
|
|
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation |