diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index 7d562b2..f3dda1a 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -1,93 +1,47 @@ from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime -import sys -sys.path.insert(0, '/opt/airflow/scripts') - -default_args = { - 'owner' : 'airflow', - 'retries' : 1, - 'email_on_failure': False, -} - -def task_verify_connection(): - from bigquery_config import verify_setup - result = verify_setup() - if not result: - raise Exception("BigQuery connection failed!") - print("BigQuery connection OK") - -def task_load_fao(): - from bigquery_config import get_bigquery_client - from bigquery_raw_layer import FAODataSource - client = get_bigquery_client() - source = FAODataSource(client) - df = source.run() - print(f"FAO loaded: {len(df):,} rows") - -def task_load_worldbank(): - from bigquery_config import get_bigquery_client - from bigquery_raw_layer import FAODataSource, WorldBankDataSource - client = get_bigquery_client() - fao_source = FAODataSource(client) - df_fao = fao_source.run() - fao_indicators = df_fao['indicator'].unique().tolist() - wb_source = WorldBankDataSource(client, fao_indicators) - df = wb_source.run() - print(f"World Bank loaded: {len(df):,} rows") - -def task_load_unicef(): - from bigquery_config import get_bigquery_client - from bigquery_raw_layer import FAODataSource, UNICEFDataSource - client = get_bigquery_client() - fao_source = FAODataSource(client) - df_fao = fao_source.run() - fao_indicators = df_fao['indicator'].unique().tolist() - unicef_source = UNICEFDataSource(client, fao_indicators) - df = unicef_source.run() - print(f"UNICEF loaded: {len(df):,} rows") - -def task_staging_integration(): - from bigquery_config import get_bigquery_client - from bigquery_raw_layer import StagingDataIntegration - client = get_bigquery_client() - staging = StagingDataIntegration(client) - df = staging.run() - print(f"Staging integrated: {len(df):,} rows") +# Import fungsi dari folder scripts +from scripts.bigquery_raw_layer import ( + run_verify_connection, + run_load_fao, + run_load_worldbank, + run_load_unicef, + run_staging_integration, +) with DAG( - dag_id = "etl_food_security_bigquery", - start_date = datetime(2026, 3, 1), - schedule_interval= None, - catchup = False, - default_args = default_args, - tags = ["food-security", "bigquery", "kimball"] + dag_id = "etl_food_security_bigquery", + start_date = datetime(2026, 3, 1), + schedule_interval = "@daily", + catchup = False, + tags = ["food-security", "bigquery", "kimball"] ) as dag: - verify = PythonOperator( - task_id = "verify_bigquery_connection", - python_callable= task_verify_connection + task_verify = PythonOperator( + task_id = "verify_bigquery_connection", + python_callable = run_verify_connection ) - load_fao = PythonOperator( - task_id = "load_fao_to_bronze", - python_callable= task_load_fao + task_fao = PythonOperator( + task_id = "load_fao_to_bronze", + python_callable = run_load_fao ) - load_wb = PythonOperator( - task_id = "load_worldbank_to_bronze", - python_callable= task_load_worldbank + task_worldbank = PythonOperator( + task_id = "load_worldbank_to_bronze", + python_callable = run_load_worldbank ) - load_unicef = PythonOperator( - task_id = "load_unicef_to_bronze", - python_callable= task_load_unicef + task_unicef = PythonOperator( + task_id = "load_unicef_to_bronze", + python_callable = run_load_unicef ) - staging = PythonOperator( - task_id = "staging_integration_to_silver", - python_callable= task_staging_integration + task_staging = PythonOperator( + task_id = "staging_integration_to_silver", + python_callable = run_staging_integration ) - verify >> load_fao >> load_wb >> load_unicef >> staging \ No newline at end of file + task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging \ No newline at end of file