diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index 4fe061a..7d562b2 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -56,37 +56,6 @@ def task_staging_integration(): df = staging.run() print(f"Staging integrated: {len(df):,} rows") -def task_cleaned_layer(): - from bigquery_config import get_bigquery_client - from bigquery_cleaned_layer import ( - load_staging_data, - standardize_country_names_asean, - assign_pillar, - assign_direction, - CleanedDataLoader - ) - import pandas as pd - - client = get_bigquery_client() - df_staging = load_staging_data(client) - - df_staging, _ = standardize_country_names_asean(df_staging, country_column='country') - - critical_columns = list(df_staging.columns) - df_no_missing = df_staging.dropna(subset=critical_columns) - - df_cleaned = df_no_missing.drop_duplicates( - subset=['indicator_standardized', 'country', 'year'], - keep='first' - ) - - df_cleaned['pillar'] = df_cleaned['indicator_standardized'].apply(assign_pillar) - df_cleaned['direction'] = df_cleaned['indicator_standardized'].apply(assign_direction) - - loader = CleanedDataLoader(client, load_mode='full_refresh') - final_count = loader.run(df_cleaned) - print(f"Cleaned loaded: {final_count:,} rows") - with DAG( dag_id = "etl_food_security_bigquery", start_date = datetime(2026, 3, 1), @@ -121,7 +90,4 @@ with DAG( python_callable= task_staging_integration ) - cleaned = PythonOperator( - task_id = "cleaned_layer_to_silver", - python_callable= task_cleaned_layer - ) \ No newline at end of file + verify >> load_fao >> load_wb >> load_unicef >> staging \ No newline at end of file