diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index 977a0d4..5494731 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -421,8 +421,8 @@ class FoodSecurityAggregator: self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") self.logger.info("=" * 70) - self.df = read_from_bigquery(self.client, "analytical_food_security", layer='gold') - self.logger.info(f" analytical_food_security : {len(self.df):,} rows") + self.df = read_from_bigquery(self.client, "fact_asean_food_security_selected", layer='gold') + self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows") self.dims["country"] = read_from_bigquery(self.client, "dim_country", layer='gold') self.dims["indicator"] = read_from_bigquery(self.client, "dim_indicator", layer='gold') @@ -1307,7 +1307,7 @@ class FoodSecurityAggregator: def run_aggregation(): """ - Airflow task: Hitung semua agregasi dari analytical_food_security. + Airflow task: Hitung semua agregasi dari fact_asean_food_security_selected. Dipanggil setelah analytical_layer_to_gold selesai. Menjalankan 6 tabel sekaligus: 4 agregasi + 2 narrative. """ diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 6543564..018be28 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -1,6 +1,6 @@ """ BIGQUERY ANALYTICAL LAYER - DATA FILTERING -FIXED: analytical_food_security disimpan di fs_asean_gold (layer='gold') +FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) @@ -8,7 +8,7 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Save analytical table (value only, normalisasi & direction handled downstream) +6. Save analytical table (dengan nama/label lengkap untuk Looker Studio) """ import pandas as pd @@ -40,15 +40,15 @@ from google.cloud import bigquery class AnalyticalLayerLoader: """ - Analytical Layer Loader for BigQuery - CORRECTED VERSION v4 + Analytical Layer Loader for BigQuery Key Logic: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years - 4. Save raw value only (normalisasi & direction handled downstream) + 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio - Output: analytical_food_security -> DW layer (Gold) -> fs_asean_gold + Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold """ def __init__(self, client: bigquery.Client): @@ -424,33 +424,65 @@ class AnalyticalLayerLoader: return year_stats def save_analytical_table(self): - table_name = 'analytical_food_security' + # --------------------------------------------------------------- + # CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio + # --------------------------------------------------------------- + table_name = 'fact_asean_food_security_selected' + self.logger.info("\n" + "=" * 80) self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: + # ------------------------------------------------------------------ + # Pilih kolom: ID + Nama lengkap + value + # Kolom nama memudahkan filtering/slicing langsung di Looker Studio + # tanpa perlu join ulang ke tabel dimensi. + # ------------------------------------------------------------------ analytical_df = self.df_clean[[ - 'country_id', 'indicator_id', 'pillar_id', 'time_id', 'value' + 'country_id', + 'country_name', + 'indicator_id', + 'indicator_name', + 'direction', + 'pillar_id', + 'pillar_name', + 'time_id', + 'year', + 'value', ]].copy() + analytical_df = analytical_df.sort_values( - ['time_id', 'country_id', 'indicator_id'] + ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) - analytical_df['country_id'] = analytical_df['country_id'].astype(int) - analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) - analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) - analytical_df['time_id'] = analytical_df['time_id'].astype(int) - analytical_df['value'] = analytical_df['value'].astype(float) + # Pastikan tipe data konsisten + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['country_name'] = analytical_df['country_name'].astype(str) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) + analytical_df['direction'] = analytical_df['direction'].astype(str) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['year'] = analytical_df['year'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) + self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") + # Schema BigQuery schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( @@ -475,7 +507,8 @@ class AnalyticalLayerLoader: 'end_year' : self.end_year, 'fixed_countries': len(self.selected_country_ids), 'no_gaps' : True, - 'layer' : 'gold' + 'layer' : 'gold', + 'columns' : 'id + name + value (Looker Studio ready)' }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), @@ -497,7 +530,7 @@ class AnalyticalLayerLoader: self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) - self.logger.info("Output: analytical_food_security → fs_asean_gold") + self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold") self.logger.info("=" * 80) self.load_source_data() @@ -528,7 +561,7 @@ class AnalyticalLayerLoader: def run_analytical_layer(): """ - Airflow task: Build analytical_food_security dari fact_food_security + dims. + Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims. Dipanggil setelah dimensional_model_to_gold selesai. """ from scripts.bigquery_config import get_bigquery_client @@ -544,7 +577,7 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) - print("Output: analytical_food_security → fs_asean_gold") + print("Output: fact_asean_food_security_selected → fs_asean_gold") print("=" * 80) logger = setup_logging()