diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index 954f188..cd10e73 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -1,11 +1,7 @@ """ BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION -Tabel 1: agg_indicator_norm -> fs_asean_gold -Tabel 2: agg_narrative_indicator -> fs_asean_gold +Tabel: agg_indicator_norm -> fs_asean_gold -============================================================================= -agg_indicator_norm -============================================================================= Tujuan: Menghitung norm_value per indikator per negara per tahun, sehingga dapat melihat performa setiap indikator secara individual (lower_better & higher_better @@ -39,27 +35,6 @@ Output Schema (agg_indicator_norm): yoy_value, -- perubahan absolut value YoY yoy_norm_value, -- perubahan absolut norm_value YoY performance -- "Good" | "Bad" | null - -============================================================================= -agg_narrative_indicator -============================================================================= -Tujuan: - Menghasilkan narasi otomatis 1 paragraf per indikator per tahun di level ASEAN - (rata-rata seluruh negara ASEAN), dijalankan otomatis setelah agg_indicator_norm - selesai dalam pipeline yang sama. - -Granularity: - year x indicator_id (level ASEAN, bukan per negara) - -Output Schema (agg_narrative_indicator): - year, indicator_id, indicator_name, unit, direction, - pillar_name, framework, - avg_value, -- rata-rata value ASEAN - avg_norm_score_1_100, -- rata-rata norm_score_1_100 ASEAN - performance, -- Good | Bad | null - yoy_avg_value, -- perubahan avg_value vs tahun sebelumnya - n_countries, -- jumlah negara yang punya data tahun ini - narrative -- 1 paragraf narasi otomatis """ import pandas as pd @@ -138,7 +113,7 @@ _PERFORMANCE_THRESHOLD: float = 60.0 # ============================================================================= -# PURE HELPERS — agg_indicator_norm +# PURE HELPERS # ============================================================================= def _should_invert(direction: str, logger=None, context: str = "") -> bool: @@ -199,141 +174,6 @@ def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame: return df -# ============================================================================= -# PURE HELPERS — agg_narrative_indicator -# ============================================================================= - -def _is_lower_better(direction: str) -> bool: - return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS - - -def _format_value(value: float, unit: str) -> str: - """Format nilai dengan unit yang sesuai.""" - if pd.isna(value): - return "N/A" - unit = str(unit).strip() if unit else "" - - if abs(value) >= 1000: - formatted = f"{value:,.1f}" - elif abs(value) >= 10: - formatted = f"{value:.2f}" - else: - formatted = f"{value:.3f}" - - return f"{formatted} {unit}".strip() - - -def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple: - """ - Kembalikan (direction_word, change_desc, is_positive_trend). - is_positive_trend: True jika perubahan menguntungkan sesuai direction. - """ - unit = str(unit).strip() if unit else "" - abs_yoy = abs(yoy) - - if abs_yoy >= 1000: - yoy_str = f"{abs_yoy:,.1f}" - elif abs_yoy >= 10: - yoy_str = f"{abs_yoy:.2f}" - else: - yoy_str = f"{abs_yoy:.3f}" - - change_desc = f"{yoy_str} {unit}".strip() - is_positive = (yoy < 0) if lower_better else (yoy > 0) - direction_word = "decreased by" if yoy < 0 else "increased by" - - return direction_word, change_desc, is_positive - - -def _build_narrative(row: pd.Series) -> str: - """ - Bangun 1 paragraf narasi ASEAN-level untuk satu baris (year x indicator_id). - """ - year = int(row["year"]) - ind_name = str(row["indicator_name"]).strip() - unit = str(row["unit"]).strip() if row["unit"] else "" - direction = str(row["direction"]).strip() - pillar = str(row["pillar_name"]).strip() - framework = str(row["framework"]).strip() - avg_val = row["avg_value"] - avg_score = row["avg_norm_score_1_100"] - performance = row["performance"] - yoy = row["yoy_avg_value"] - n_countries = int(row["n_countries"]) if not pd.isna(row["n_countries"]) else 0 - - lower_better = _is_lower_better(direction) - direction_label = ( - "lower values indicate better outcomes" if lower_better - else "higher values indicate better outcomes" - ) - - # --- Bagian 1: Nilai rata-rata ASEAN --- - val_str = _format_value(avg_val, unit) - sentence1 = ( - f"In {year}, the ASEAN regional average for {ind_name} stood at {val_str}" - ) - if n_countries > 0: - sentence1 += ( - f", based on data from {n_countries} " - f"ASEAN member state{'s' if n_countries > 1 else ''}" - ) - sentence1 += "." - - # --- Bagian 2: Score dan performance --- - if not pd.isna(avg_score): - score_str = f"{avg_score:.1f} out of 100" - if performance == "Good": - perf_phrase = ( - f"The region achieved a normalized score of {score_str}, " - f"classified as Good performance — meeting the 60-point threshold — " - f"under the {framework} framework ({pillar} pillar)." - ) - elif performance == "Bad": - perf_phrase = ( - f"The region recorded a normalized score of {score_str}, " - f"classified as Bad performance — falling below the 60-point threshold — " - f"under the {framework} framework ({pillar} pillar)." - ) - else: - perf_phrase = ( - f"The region recorded a normalized score of {score_str} " - f"under the {framework} framework ({pillar} pillar)." - ) - else: - perf_phrase = ( - f"Performance could not be assessed due to insufficient data " - f"under the {framework} framework ({pillar} pillar)." - ) - - # --- Bagian 3 & 4: Arah + YoY --- - direction_phrase = f"Since {direction_label} for this indicator" - - if not pd.isna(yoy) and yoy != 0: - direction_word, change_desc, is_positive = _format_yoy(yoy, unit, lower_better) - if is_positive: - trend_word = "a positive trend" - tone = "reflecting improvements in regional food security performance" - else: - trend_word = "a deteriorating trend" - tone = "signaling the need for greater regional attention and policy response" - yoy_phrase = ( - f"{direction_phrase}, the regional average {direction_word} {change_desc} " - f"compared to {year - 1}, reflecting {trend_word} — {tone}." - ) - elif pd.isna(yoy): - yoy_phrase = ( - f"No prior year data is available for comparison, " - f"as this is the earliest recorded year for this indicator in the dataset." - ) - else: - yoy_phrase = ( - f"{direction_phrase}, the regional average remained stable " - f"compared to {year - 1}, with no measurable change year-on-year." - ) - - return f"{sentence1} {perf_phrase} {yoy_phrase}" - - # ============================================================================= # MAIN CLASS # ============================================================================= @@ -342,28 +182,18 @@ class IndicatorNormAggregator: """ Hitung norm_value per indikator untuk seluruh data di fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm. - Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator. - Alur agg_indicator_norm: - 1. Load fact_asean_food_security_selected - 2. Load dim_indicator -> ambil kolom unit - 3. Merge unit ke df - 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) - 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label - 6. Hitung norm_value per indikator (direction-aware, 0-1) - 7. Hitung YoY per (indicator_id, country_id) - 8. Scale ke 1-100 per indikator (global) - 9. Assign performance label (Good/Bad) - 10. Simpan ke BigQuery -> agg_indicator_norm - 11. Summary log agg_indicator_norm - - Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada): - 12. Agregasi ke level ASEAN (year x indicator_id) - 13. Hitung YoY avg_value per indikator - 14. Assign performance berdasarkan avg_norm_score - 15. Build narrative 1 paragraf per baris - 16. Simpan ke BigQuery -> agg_narrative_indicator - 17. Summary log agg_narrative_indicator + Alur: + 1. Load fact_asean_food_security_selected + 2. Load dim_indicator -> ambil kolom unit + 3. Merge unit ke df + 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) + 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label + 6. Hitung norm_value per indikator (direction-aware, 0-1) + 7. Hitung YoY per (indicator_id, country_id) + 8. Scale ke 1-100 per indikator (global) + 9. Assign performance label (Good/Bad) + 10. Simpan ke BigQuery """ def __init__(self, client: bigquery.Client): @@ -377,11 +207,10 @@ class IndicatorNormAggregator: self.pipeline_start = None self.pipeline_metadata = { - "rows_fetched" : 0, - "rows_loaded" : 0, - "rows_loaded_narrative" : 0, - "start_time" : None, - "end_time" : None, + "rows_fetched": 0, + "rows_loaded" : 0, + "start_time" : None, + "end_time" : None, } # ========================================================================= @@ -730,7 +559,7 @@ class IndicatorNormAggregator: return df # ========================================================================= - # STEP 10: Save agg_indicator_norm to BigQuery + # STEP 10: Save to BigQuery # ========================================================================= def _save(self, df: pd.DataFrame) -> int: @@ -852,12 +681,12 @@ class IndicatorNormAggregator: return rows_loaded # ========================================================================= - # STEP 11: Summary log agg_indicator_norm + # STEP 11: Summary log # ========================================================================= def _log_summary(self, df: pd.DataFrame): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 11: SUMMARY — agg_indicator_norm") + self.logger.info("STEP 11: SUMMARY") self.logger.info("=" * 80) summary = ( @@ -938,162 +767,6 @@ class IndicatorNormAggregator: for _, r in pillar_summary.iterrows(): self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}") - # ========================================================================= - # STEP 12-16: agg_narrative_indicator (lanjutan dari df_final) - # ========================================================================= - - def _build_narrative_table(self, df_final: pd.DataFrame): - """ - Pipeline agg_narrative_indicator yang dijalankan otomatis - setelah agg_indicator_norm selesai. Memakai df_final yang sudah ada - di memori, tanpa re-load dari BigQuery. - """ - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 12-16: agg_narrative_indicator") - self.logger.info(" Level : ASEAN (year x indicator_id)") - self.logger.info("=" * 80) - - # -- STEP 12: Agregasi ke level ASEAN -- - self.logger.info("\n--- STEP 12: AGGREGATE TO ASEAN LEVEL ---") - dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] - agg_dict = {col: "first" for col in dim_cols} - agg_dict["value"] = "mean" - agg_dict["norm_score_1_100"] = "mean" - agg_dict["country_id"] = "count" - - df_agg = ( - df_final.groupby(["year", "indicator_id"]) - .agg(agg_dict) - .reset_index() - .rename(columns={ - "value" : "avg_value", - "norm_score_1_100": "avg_norm_score_1_100", - "country_id" : "n_countries", - }) - ) - self.logger.info(f" Rows : {len(df_agg):,}") - self.logger.info(f" Inds : {df_agg['indicator_id'].nunique()}") - self.logger.info( - f" Years : {int(df_agg['year'].min())} - {int(df_agg['year'].max())}" - ) - - # -- STEP 13: YoY avg_value per indikator -- - self.logger.info("\n--- STEP 13: COMPUTE YoY avg_value ---") - parts = [] - for ind_id, grp in df_agg.groupby("indicator_id"): - grp = grp.sort_values("year").copy() - grp["prev_avg_value"] = grp["avg_value"].shift(1) - grp["yoy_avg_value"] = np.where( - grp["avg_value"].notna() & grp["prev_avg_value"].notna(), - grp["avg_value"] - grp["prev_avg_value"], - np.nan, - ) - grp = grp.drop(columns=["prev_avg_value"]) - parts.append(grp) - df_agg = pd.concat(parts, ignore_index=True) - self.logger.info(f" yoy_avg_value nulls: {df_agg['yoy_avg_value'].isna().sum():,}") - - # -- STEP 14: Assign performance -- - self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---") - df_agg["performance"] = pd.NA - has_score = df_agg["avg_norm_score_1_100"].notna() - df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" - df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" - n_good = (df_agg["performance"] == "Good").sum() - n_bad = (df_agg["performance"] == "Bad").sum() - self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}") - - # -- STEP 15: Build narrative -- - self.logger.info("\n--- STEP 15: BUILD NARRATIVE ---") - df_agg["narrative"] = df_agg.apply(_build_narrative, axis=1) - self.logger.info(f" Narratives generated: {len(df_agg):,}") - self.logger.info("\n Sample (first 2):") - for _, row in df_agg.head(2).iterrows(): - self.logger.info( - f"\n [{int(row['year'])}] {row['indicator_name'][:50]}" - f"\n -> {row['narrative'][:250]}..." - ) - - # -- STEP 16: Save -- - self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---") - out = df_agg[[ - "year", "indicator_id", "indicator_name", "unit", "direction", - "pillar_name", "framework", - "avg_value", "avg_norm_score_1_100", "performance", - "yoy_avg_value", "n_countries", "narrative", - ]].copy() - - out = out.sort_values(["year", "pillar_name", "indicator_name"]).reset_index(drop=True) - - out["year"] = out["year"].astype(int) - out["indicator_id"] = out["indicator_id"].astype(int) - out["indicator_name"] = out["indicator_name"].astype(str) - out["unit"] = out["unit"].fillna("").astype(str) - out["direction"] = out["direction"].astype(str) - out["pillar_name"] = out["pillar_name"].astype(str) - out["framework"] = out["framework"].astype(str) - out["avg_value"] = out["avg_value"].astype(float) - out["avg_norm_score_1_100"] = out["avg_norm_score_1_100"].astype(float) - out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") - out["yoy_avg_value"] = pd.to_numeric(out["yoy_avg_value"], errors="coerce").astype(float) - out["n_countries"] = out["n_countries"].astype(int) - out["narrative"] = out["narrative"].astype(str) - - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("avg_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), - bigquery.SchemaField("yoy_avg_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), - ] - - rows_loaded = load_to_bigquery( - self.client, out, "agg_narrative_indicator", - layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema, - ) - - log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded) - self.logger.info( - f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold" - ) - - metadata = { - "source_class" : self.__class__.__name__, - "table_name" : "agg_narrative_indicator", - "execution_timestamp": self.pipeline_start, - "duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(), - "rows_fetched" : self.pipeline_metadata["rows_fetched"], - "rows_transformed" : rows_loaded, - "rows_loaded" : rows_loaded, - "completeness_pct" : 100.0, - "config_snapshot" : json.dumps({ - "source_table" : "agg_indicator_norm (in-memory df_final)", - "granularity" : "year x indicator_id (ASEAN level)", - "aggregation" : "mean across ASEAN countries", - "performance_threshold": _PERFORMANCE_THRESHOLD, - "yoy_column" : "yoy_avg_value", - "layer" : "gold", - }), - "validation_metrics" : json.dumps({ - "total_rows" : rows_loaded, - "n_indicators": int(out["indicator_id"].nunique()), - "year_min" : int(out["year"].min()), - "year_max" : int(out["year"].max()), - }), - } - save_etl_metadata(self.client, metadata) - self.logger.info(" Metadata -> [AUDIT] etl_metadata") - - self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded - # ========================================================================= # RUN # ========================================================================= @@ -1107,7 +780,6 @@ class IndicatorNormAggregator: self.logger.info(" Source : fact_asean_food_security_selected") self.logger.info(" Dim : dim_indicator (unit)") self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold") - self.logger.info(" agg_narrative_indicator -> fs_asean_gold") self.logger.info("=" * 80) self.load_data() @@ -1115,17 +787,14 @@ class IndicatorNormAggregator: self._merge_unit() self.sdgs_start_year = self._detect_sdgs_start_year() self._assign_framework() - df_normed = self._compute_norm_values() - df_yoy = self._compute_yoy_columns(df_normed) - df_scored = self._compute_scores(df_yoy) - df_final = self._assign_performance(df_scored) + df_normed = self._compute_norm_values() + df_yoy = self._compute_yoy_columns(df_normed) + df_scored = self._compute_scores(df_yoy) + df_final = self._assign_performance(df_scored) rows_loaded = self._save(df_final) self.pipeline_metadata["rows_loaded"] = rows_loaded self._log_summary(df_final) - # Lanjut build agg_narrative_indicator dari df_final (tanpa re-load BQ) - self._build_narrative_table(df_final) - self.pipeline_metadata["end_time"] = datetime.now() duration = ( self.pipeline_metadata["end_time"] - self.pipeline_start @@ -1134,27 +803,25 @@ class IndicatorNormAggregator: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}") - self.logger.info(f" Rows Loaded (norm) : {rows_loaded:,}") - self.logger.info(f" Rows Loaded (narrative) : {self.pipeline_metadata['rows_loaded_narrative']:,}") - self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}") + self.logger.info(f" Rows Loaded : {rows_loaded:,}") + self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}") # ============================================================================= -# AIRFLOW TASK <-- tidak berubah +# AIRFLOW TASK # ============================================================================= def run_indicator_norm_aggregation(): """ - Airflow task: Build agg_indicator_norm + agg_narrative_indicator. + Airflow task: Build agg_indicator_norm. Dipanggil setelah analytical_layer_to_gold selesai. """ client = get_bigquery_client() agg = IndicatorNormAggregator(client) agg.run() - print(f"agg_indicator_norm loaded : {agg.pipeline_metadata['rows_loaded']:,} rows") - print(f"agg_narrative_indicator loaded: {agg.pipeline_metadata['rows_loaded_narrative']:,} rows") + print(f"agg_indicator_norm loaded: {agg.pipeline_metadata['rows_loaded']:,} rows") # ============================================================================= @@ -1173,7 +840,6 @@ if __name__ == "__main__": print(" Source : fact_asean_food_security_selected") print(" Dim : dim_indicator (unit)") print(" Output : agg_indicator_norm") - print(" agg_narrative_indicator") print("=" * 80) logger = setup_logging() diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index b8c3a32..8e698af 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -177,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou def assign_pillar(indicator_name: str) -> str: """ Assign pillar berdasarkan keyword indikator. - Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Sustainability' + Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting' All ≤ 20 chars (varchar(20) constraint). """ if pd.isna(indicator_name): - return 'Sustainability' + return 'Supporting' ind = str(indicator_name).lower() for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: if kw in ind: - return 'Sustainability' + return 'Supporting' if any(kw in ind for kw in [ 'adequacy', 'protein supply', 'supply of protein', @@ -215,7 +215,7 @@ def assign_pillar(indicator_name: str) -> str: ]): return 'Utilization' - return 'Sustainability' + return 'Supporting' # ============================================================================= diff --git a/scripts/bigquery_dimensional_model.py b/scripts/bigquery_dimensional_model.py index 735f7a2..ba8d14c 100644 --- a/scripts/bigquery_dimensional_model.py +++ b/scripts/bigquery_dimensional_model.py @@ -350,7 +350,7 @@ class DimensionalModelLoader: elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): return 'Infrastructure' else: - return 'Sustainability' + return 'Supporting' dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') @@ -471,10 +471,10 @@ class DimensionalModelLoader: try: pillar_codes = { 'Availability': 'AVL', 'Access' : 'ACC', - 'Utilization' : 'UTL', 'Stability': 'STB', 'Sustainability': 'STN', + 'Utilization' : 'UTL', 'Stability': 'STB', 'Supporting': 'SPT', } pillars_data = [ - {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'STN')} + {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'SPT')} for p in self.df_clean['pillar'].unique() ]