from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import sys sys.path.insert(0, '/opt/airflow/scripts') default_args = { 'owner' : 'airflow', 'retries' : 1, 'email_on_failure': False, } def task_verify_connection(): from bigquery_config import verify_setup result = verify_setup() if not result: raise Exception("BigQuery connection failed!") print("BigQuery connection OK") def task_load_fao(): from bigquery_config import get_bigquery_client from bigquery_raw_layer import FAODataSource client = get_bigquery_client() source = FAODataSource(client) df = source.run() print(f"FAO loaded: {len(df):,} rows") def task_load_worldbank(): from bigquery_config import get_bigquery_client from bigquery_raw_layer import FAODataSource, WorldBankDataSource client = get_bigquery_client() fao_source = FAODataSource(client) df_fao = fao_source.run() fao_indicators = df_fao['indicator'].unique().tolist() wb_source = WorldBankDataSource(client, fao_indicators) df = wb_source.run() print(f"World Bank loaded: {len(df):,} rows") def task_load_unicef(): from bigquery_config import get_bigquery_client from bigquery_raw_layer import FAODataSource, UNICEFDataSource client = get_bigquery_client() fao_source = FAODataSource(client) df_fao = fao_source.run() fao_indicators = df_fao['indicator'].unique().tolist() unicef_source = UNICEFDataSource(client, fao_indicators) df = unicef_source.run() print(f"UNICEF loaded: {len(df):,} rows") def task_staging_integration(): from bigquery_config import get_bigquery_client from bigquery_raw_layer import StagingDataIntegration client = get_bigquery_client() staging = StagingDataIntegration(client) df = staging.run() print(f"Staging integrated: {len(df):,} rows") def task_cleaned_layer(): from bigquery_config import get_bigquery_client from bigquery_cleaned_layer import ( load_staging_data, standardize_country_names_asean, assign_pillar, assign_direction, CleanedDataLoader ) import pandas as pd client = get_bigquery_client() df_staging = load_staging_data(client) df_staging, _ = standardize_country_names_asean(df_staging, country_column='country') critical_columns = list(df_staging.columns) df_no_missing = df_staging.dropna(subset=critical_columns) df_cleaned = df_no_missing.drop_duplicates( subset=['indicator_standardized', 'country', 'year'], keep='first' ) df_cleaned['pillar'] = df_cleaned['indicator_standardized'].apply(assign_pillar) df_cleaned['direction'] = df_cleaned['indicator_standardized'].apply(assign_direction) loader = CleanedDataLoader(client, load_mode='full_refresh') final_count = loader.run(df_cleaned) print(f"Cleaned loaded: {final_count:,} rows") with DAG( dag_id = "etl_food_security_bigquery", start_date = datetime(2026, 3, 1), schedule_interval= None, catchup = False, default_args = default_args, tags = ["food-security", "bigquery", "kimball"] ) as dag: verify = PythonOperator( task_id = "verify_bigquery_connection", python_callable= task_verify_connection ) load_fao = PythonOperator( task_id = "load_fao_to_bronze", python_callable= task_load_fao ) load_wb = PythonOperator( task_id = "load_worldbank_to_bronze", python_callable= task_load_worldbank ) load_unicef = PythonOperator( task_id = "load_unicef_to_bronze", python_callable= task_load_unicef ) staging = PythonOperator( task_id = "staging_integration_to_silver", python_callable= task_staging_integration ) cleaned = PythonOperator( task_id = "cleaned_layer_to_silver", python_callable= task_cleaned_layer )