From 92805b9dbd75a30fc683fc6b9d8ba844cb2e173d Mon Sep 17 00:00:00 2001 From: Debby Date: Sat, 7 Mar 2026 15:57:38 +0700 Subject: [PATCH] add airflow task functions --- scripts/bigquery_raw_layer.py | 55 ++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/scripts/bigquery_raw_layer.py b/scripts/bigquery_raw_layer.py index 177e4f0..8f7596e 100644 --- a/scripts/bigquery_raw_layer.py +++ b/scripts/bigquery_raw_layer.py @@ -426,16 +426,6 @@ class StagingDataIntegration: Input : RAW layer (Bronze) — raw_fao, raw_worldbank, raw_unicef Output : STAGING layer (Silver) — staging_integrated Audit : etl_logs, etl_metadata (Audit → fs_asean_audit) - - Schema staging_integrated: - source varchar(20) - indicator_original varchar(255) - indicator_standardized varchar(255) - country varchar(100) - year int - year_range varchar(20) - value float - unit varchar(20) """ def __init__(self, client: bigquery.Client): @@ -798,4 +788,47 @@ if __name__ == "__main__": print(f"RAW (Bronze) : raw_fao, raw_worldbank, raw_unicef") print(f"STAGING (Silver) : staging_integrated") print(f"AUDIT : etl_logs, etl_metadata") - print("=" * 60) \ No newline at end of file + print("=" * 60) + +# AIRFLOW TASK FUNCTIONS + +def run_verify_connection(): + from scripts.bigquery_config import verify_setup + result = verify_setup() + if not result: + raise Exception("BigQuery connection failed!") + print("BigQuery connection OK") + +def run_load_fao(): + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + source = FAODataSource(client) + df = source.run() + print(f"FAO loaded: {len(df):,} rows") + +def run_load_worldbank(): + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + fao_source = FAODataSource(client) + df_fao = fao_source.run() + fao_indicators = df_fao['indicator'].unique().tolist() + wb_source = WorldBankDataSource(client, fao_indicators) + df = wb_source.run() + print(f"World Bank loaded: {len(df):,} rows") + +def run_load_unicef(): + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + fao_source = FAODataSource(client) + df_fao = fao_source.run() + fao_indicators = df_fao['indicator'].unique().tolist() + unicef_source = UNICEFDataSource(client, fao_indicators) + df = unicef_source.run() + print(f"UNICEF loaded: {len(df):,} rows") + +def run_staging_integration(): + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + staging = StagingDataIntegration(client) + df = staging.run() + print(f"Staging integrated: {len(df):,} rows") \ No newline at end of file