From 0384e62b01180d61ac21ba2cde1ce9392ccff16e Mon Sep 17 00:00:00 2001 From: Debby Date: Tue, 7 Apr 2026 10:00:45 +0700 Subject: [PATCH] bismillah done --- .../bigquery_aggraget_fact_selected_layer.py | 396 ++++++++++++++++-- scripts/bigquery_cleaned_layer.py | 8 +- scripts/bigquery_dimensional_model.py | 6 +- 3 files changed, 372 insertions(+), 38 deletions(-) diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index cd10e73..954f188 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -1,7 +1,11 @@ """ BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION -Tabel: agg_indicator_norm -> fs_asean_gold +Tabel 1: agg_indicator_norm -> fs_asean_gold +Tabel 2: agg_narrative_indicator -> fs_asean_gold +============================================================================= +agg_indicator_norm +============================================================================= Tujuan: Menghitung norm_value per indikator per negara per tahun, sehingga dapat melihat performa setiap indikator secara individual (lower_better & higher_better @@ -35,6 +39,27 @@ Output Schema (agg_indicator_norm): yoy_value, -- perubahan absolut value YoY yoy_norm_value, -- perubahan absolut norm_value YoY performance -- "Good" | "Bad" | null + +============================================================================= +agg_narrative_indicator +============================================================================= +Tujuan: + Menghasilkan narasi otomatis 1 paragraf per indikator per tahun di level ASEAN + (rata-rata seluruh negara ASEAN), dijalankan otomatis setelah agg_indicator_norm + selesai dalam pipeline yang sama. + +Granularity: + year x indicator_id (level ASEAN, bukan per negara) + +Output Schema (agg_narrative_indicator): + year, indicator_id, indicator_name, unit, direction, + pillar_name, framework, + avg_value, -- rata-rata value ASEAN + avg_norm_score_1_100, -- rata-rata norm_score_1_100 ASEAN + performance, -- Good | Bad | null + yoy_avg_value, -- perubahan avg_value vs tahun sebelumnya + n_countries, -- jumlah negara yang punya data tahun ini + narrative -- 1 paragraf narasi otomatis """ import pandas as pd @@ -113,7 +138,7 @@ _PERFORMANCE_THRESHOLD: float = 60.0 # ============================================================================= -# PURE HELPERS +# PURE HELPERS — agg_indicator_norm # ============================================================================= def _should_invert(direction: str, logger=None, context: str = "") -> bool: @@ -174,6 +199,141 @@ def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame: return df +# ============================================================================= +# PURE HELPERS — agg_narrative_indicator +# ============================================================================= + +def _is_lower_better(direction: str) -> bool: + return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS + + +def _format_value(value: float, unit: str) -> str: + """Format nilai dengan unit yang sesuai.""" + if pd.isna(value): + return "N/A" + unit = str(unit).strip() if unit else "" + + if abs(value) >= 1000: + formatted = f"{value:,.1f}" + elif abs(value) >= 10: + formatted = f"{value:.2f}" + else: + formatted = f"{value:.3f}" + + return f"{formatted} {unit}".strip() + + +def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple: + """ + Kembalikan (direction_word, change_desc, is_positive_trend). + is_positive_trend: True jika perubahan menguntungkan sesuai direction. + """ + unit = str(unit).strip() if unit else "" + abs_yoy = abs(yoy) + + if abs_yoy >= 1000: + yoy_str = f"{abs_yoy:,.1f}" + elif abs_yoy >= 10: + yoy_str = f"{abs_yoy:.2f}" + else: + yoy_str = f"{abs_yoy:.3f}" + + change_desc = f"{yoy_str} {unit}".strip() + is_positive = (yoy < 0) if lower_better else (yoy > 0) + direction_word = "decreased by" if yoy < 0 else "increased by" + + return direction_word, change_desc, is_positive + + +def _build_narrative(row: pd.Series) -> str: + """ + Bangun 1 paragraf narasi ASEAN-level untuk satu baris (year x indicator_id). + """ + year = int(row["year"]) + ind_name = str(row["indicator_name"]).strip() + unit = str(row["unit"]).strip() if row["unit"] else "" + direction = str(row["direction"]).strip() + pillar = str(row["pillar_name"]).strip() + framework = str(row["framework"]).strip() + avg_val = row["avg_value"] + avg_score = row["avg_norm_score_1_100"] + performance = row["performance"] + yoy = row["yoy_avg_value"] + n_countries = int(row["n_countries"]) if not pd.isna(row["n_countries"]) else 0 + + lower_better = _is_lower_better(direction) + direction_label = ( + "lower values indicate better outcomes" if lower_better + else "higher values indicate better outcomes" + ) + + # --- Bagian 1: Nilai rata-rata ASEAN --- + val_str = _format_value(avg_val, unit) + sentence1 = ( + f"In {year}, the ASEAN regional average for {ind_name} stood at {val_str}" + ) + if n_countries > 0: + sentence1 += ( + f", based on data from {n_countries} " + f"ASEAN member state{'s' if n_countries > 1 else ''}" + ) + sentence1 += "." + + # --- Bagian 2: Score dan performance --- + if not pd.isna(avg_score): + score_str = f"{avg_score:.1f} out of 100" + if performance == "Good": + perf_phrase = ( + f"The region achieved a normalized score of {score_str}, " + f"classified as Good performance — meeting the 60-point threshold — " + f"under the {framework} framework ({pillar} pillar)." + ) + elif performance == "Bad": + perf_phrase = ( + f"The region recorded a normalized score of {score_str}, " + f"classified as Bad performance — falling below the 60-point threshold — " + f"under the {framework} framework ({pillar} pillar)." + ) + else: + perf_phrase = ( + f"The region recorded a normalized score of {score_str} " + f"under the {framework} framework ({pillar} pillar)." + ) + else: + perf_phrase = ( + f"Performance could not be assessed due to insufficient data " + f"under the {framework} framework ({pillar} pillar)." + ) + + # --- Bagian 3 & 4: Arah + YoY --- + direction_phrase = f"Since {direction_label} for this indicator" + + if not pd.isna(yoy) and yoy != 0: + direction_word, change_desc, is_positive = _format_yoy(yoy, unit, lower_better) + if is_positive: + trend_word = "a positive trend" + tone = "reflecting improvements in regional food security performance" + else: + trend_word = "a deteriorating trend" + tone = "signaling the need for greater regional attention and policy response" + yoy_phrase = ( + f"{direction_phrase}, the regional average {direction_word} {change_desc} " + f"compared to {year - 1}, reflecting {trend_word} — {tone}." + ) + elif pd.isna(yoy): + yoy_phrase = ( + f"No prior year data is available for comparison, " + f"as this is the earliest recorded year for this indicator in the dataset." + ) + else: + yoy_phrase = ( + f"{direction_phrase}, the regional average remained stable " + f"compared to {year - 1}, with no measurable change year-on-year." + ) + + return f"{sentence1} {perf_phrase} {yoy_phrase}" + + # ============================================================================= # MAIN CLASS # ============================================================================= @@ -182,18 +342,28 @@ class IndicatorNormAggregator: """ Hitung norm_value per indikator untuk seluruh data di fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm. + Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator. - Alur: - 1. Load fact_asean_food_security_selected - 2. Load dim_indicator -> ambil kolom unit - 3. Merge unit ke df - 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) - 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label - 6. Hitung norm_value per indikator (direction-aware, 0-1) - 7. Hitung YoY per (indicator_id, country_id) - 8. Scale ke 1-100 per indikator (global) - 9. Assign performance label (Good/Bad) - 10. Simpan ke BigQuery + Alur agg_indicator_norm: + 1. Load fact_asean_food_security_selected + 2. Load dim_indicator -> ambil kolom unit + 3. Merge unit ke df + 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) + 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label + 6. Hitung norm_value per indikator (direction-aware, 0-1) + 7. Hitung YoY per (indicator_id, country_id) + 8. Scale ke 1-100 per indikator (global) + 9. Assign performance label (Good/Bad) + 10. Simpan ke BigQuery -> agg_indicator_norm + 11. Summary log agg_indicator_norm + + Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada): + 12. Agregasi ke level ASEAN (year x indicator_id) + 13. Hitung YoY avg_value per indikator + 14. Assign performance berdasarkan avg_norm_score + 15. Build narrative 1 paragraf per baris + 16. Simpan ke BigQuery -> agg_narrative_indicator + 17. Summary log agg_narrative_indicator """ def __init__(self, client: bigquery.Client): @@ -207,10 +377,11 @@ class IndicatorNormAggregator: self.pipeline_start = None self.pipeline_metadata = { - "rows_fetched": 0, - "rows_loaded" : 0, - "start_time" : None, - "end_time" : None, + "rows_fetched" : 0, + "rows_loaded" : 0, + "rows_loaded_narrative" : 0, + "start_time" : None, + "end_time" : None, } # ========================================================================= @@ -559,7 +730,7 @@ class IndicatorNormAggregator: return df # ========================================================================= - # STEP 10: Save to BigQuery + # STEP 10: Save agg_indicator_norm to BigQuery # ========================================================================= def _save(self, df: pd.DataFrame) -> int: @@ -681,12 +852,12 @@ class IndicatorNormAggregator: return rows_loaded # ========================================================================= - # STEP 11: Summary log + # STEP 11: Summary log agg_indicator_norm # ========================================================================= def _log_summary(self, df: pd.DataFrame): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 11: SUMMARY") + self.logger.info("STEP 11: SUMMARY — agg_indicator_norm") self.logger.info("=" * 80) summary = ( @@ -767,6 +938,162 @@ class IndicatorNormAggregator: for _, r in pillar_summary.iterrows(): self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}") + # ========================================================================= + # STEP 12-16: agg_narrative_indicator (lanjutan dari df_final) + # ========================================================================= + + def _build_narrative_table(self, df_final: pd.DataFrame): + """ + Pipeline agg_narrative_indicator yang dijalankan otomatis + setelah agg_indicator_norm selesai. Memakai df_final yang sudah ada + di memori, tanpa re-load dari BigQuery. + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 12-16: agg_narrative_indicator") + self.logger.info(" Level : ASEAN (year x indicator_id)") + self.logger.info("=" * 80) + + # -- STEP 12: Agregasi ke level ASEAN -- + self.logger.info("\n--- STEP 12: AGGREGATE TO ASEAN LEVEL ---") + dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] + agg_dict = {col: "first" for col in dim_cols} + agg_dict["value"] = "mean" + agg_dict["norm_score_1_100"] = "mean" + agg_dict["country_id"] = "count" + + df_agg = ( + df_final.groupby(["year", "indicator_id"]) + .agg(agg_dict) + .reset_index() + .rename(columns={ + "value" : "avg_value", + "norm_score_1_100": "avg_norm_score_1_100", + "country_id" : "n_countries", + }) + ) + self.logger.info(f" Rows : {len(df_agg):,}") + self.logger.info(f" Inds : {df_agg['indicator_id'].nunique()}") + self.logger.info( + f" Years : {int(df_agg['year'].min())} - {int(df_agg['year'].max())}" + ) + + # -- STEP 13: YoY avg_value per indikator -- + self.logger.info("\n--- STEP 13: COMPUTE YoY avg_value ---") + parts = [] + for ind_id, grp in df_agg.groupby("indicator_id"): + grp = grp.sort_values("year").copy() + grp["prev_avg_value"] = grp["avg_value"].shift(1) + grp["yoy_avg_value"] = np.where( + grp["avg_value"].notna() & grp["prev_avg_value"].notna(), + grp["avg_value"] - grp["prev_avg_value"], + np.nan, + ) + grp = grp.drop(columns=["prev_avg_value"]) + parts.append(grp) + df_agg = pd.concat(parts, ignore_index=True) + self.logger.info(f" yoy_avg_value nulls: {df_agg['yoy_avg_value'].isna().sum():,}") + + # -- STEP 14: Assign performance -- + self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---") + df_agg["performance"] = pd.NA + has_score = df_agg["avg_norm_score_1_100"].notna() + df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" + df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" + n_good = (df_agg["performance"] == "Good").sum() + n_bad = (df_agg["performance"] == "Bad").sum() + self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}") + + # -- STEP 15: Build narrative -- + self.logger.info("\n--- STEP 15: BUILD NARRATIVE ---") + df_agg["narrative"] = df_agg.apply(_build_narrative, axis=1) + self.logger.info(f" Narratives generated: {len(df_agg):,}") + self.logger.info("\n Sample (first 2):") + for _, row in df_agg.head(2).iterrows(): + self.logger.info( + f"\n [{int(row['year'])}] {row['indicator_name'][:50]}" + f"\n -> {row['narrative'][:250]}..." + ) + + # -- STEP 16: Save -- + self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---") + out = df_agg[[ + "year", "indicator_id", "indicator_name", "unit", "direction", + "pillar_name", "framework", + "avg_value", "avg_norm_score_1_100", "performance", + "yoy_avg_value", "n_countries", "narrative", + ]].copy() + + out = out.sort_values(["year", "pillar_name", "indicator_name"]).reset_index(drop=True) + + out["year"] = out["year"].astype(int) + out["indicator_id"] = out["indicator_id"].astype(int) + out["indicator_name"] = out["indicator_name"].astype(str) + out["unit"] = out["unit"].fillna("").astype(str) + out["direction"] = out["direction"].astype(str) + out["pillar_name"] = out["pillar_name"].astype(str) + out["framework"] = out["framework"].astype(str) + out["avg_value"] = out["avg_value"].astype(float) + out["avg_norm_score_1_100"] = out["avg_norm_score_1_100"].astype(float) + out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") + out["yoy_avg_value"] = pd.to_numeric(out["yoy_avg_value"], errors="coerce").astype(float) + out["n_countries"] = out["n_countries"].astype(int) + out["narrative"] = out["narrative"].astype(str) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("avg_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), + bigquery.SchemaField("yoy_avg_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, out, "agg_narrative_indicator", + layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema, + ) + + log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded) + self.logger.info( + f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold" + ) + + metadata = { + "source_class" : self.__class__.__name__, + "table_name" : "agg_narrative_indicator", + "execution_timestamp": self.pipeline_start, + "duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(), + "rows_fetched" : self.pipeline_metadata["rows_fetched"], + "rows_transformed" : rows_loaded, + "rows_loaded" : rows_loaded, + "completeness_pct" : 100.0, + "config_snapshot" : json.dumps({ + "source_table" : "agg_indicator_norm (in-memory df_final)", + "granularity" : "year x indicator_id (ASEAN level)", + "aggregation" : "mean across ASEAN countries", + "performance_threshold": _PERFORMANCE_THRESHOLD, + "yoy_column" : "yoy_avg_value", + "layer" : "gold", + }), + "validation_metrics" : json.dumps({ + "total_rows" : rows_loaded, + "n_indicators": int(out["indicator_id"].nunique()), + "year_min" : int(out["year"].min()), + "year_max" : int(out["year"].max()), + }), + } + save_etl_metadata(self.client, metadata) + self.logger.info(" Metadata -> [AUDIT] etl_metadata") + + self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded + # ========================================================================= # RUN # ========================================================================= @@ -780,6 +1107,7 @@ class IndicatorNormAggregator: self.logger.info(" Source : fact_asean_food_security_selected") self.logger.info(" Dim : dim_indicator (unit)") self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold") + self.logger.info(" agg_narrative_indicator -> fs_asean_gold") self.logger.info("=" * 80) self.load_data() @@ -787,14 +1115,17 @@ class IndicatorNormAggregator: self._merge_unit() self.sdgs_start_year = self._detect_sdgs_start_year() self._assign_framework() - df_normed = self._compute_norm_values() - df_yoy = self._compute_yoy_columns(df_normed) - df_scored = self._compute_scores(df_yoy) - df_final = self._assign_performance(df_scored) + df_normed = self._compute_norm_values() + df_yoy = self._compute_yoy_columns(df_normed) + df_scored = self._compute_scores(df_yoy) + df_final = self._assign_performance(df_scored) rows_loaded = self._save(df_final) self.pipeline_metadata["rows_loaded"] = rows_loaded self._log_summary(df_final) + # Lanjut build agg_narrative_indicator dari df_final (tanpa re-load BQ) + self._build_narrative_table(df_final) + self.pipeline_metadata["end_time"] = datetime.now() duration = ( self.pipeline_metadata["end_time"] - self.pipeline_start @@ -803,25 +1134,27 @@ class IndicatorNormAggregator: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}") - self.logger.info(f" Rows Loaded : {rows_loaded:,}") - self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}") + self.logger.info(f" Rows Loaded (norm) : {rows_loaded:,}") + self.logger.info(f" Rows Loaded (narrative) : {self.pipeline_metadata['rows_loaded_narrative']:,}") + self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}") # ============================================================================= -# AIRFLOW TASK +# AIRFLOW TASK <-- tidak berubah # ============================================================================= def run_indicator_norm_aggregation(): """ - Airflow task: Build agg_indicator_norm. + Airflow task: Build agg_indicator_norm + agg_narrative_indicator. Dipanggil setelah analytical_layer_to_gold selesai. """ client = get_bigquery_client() agg = IndicatorNormAggregator(client) agg.run() - print(f"agg_indicator_norm loaded: {agg.pipeline_metadata['rows_loaded']:,} rows") + print(f"agg_indicator_norm loaded : {agg.pipeline_metadata['rows_loaded']:,} rows") + print(f"agg_narrative_indicator loaded: {agg.pipeline_metadata['rows_loaded_narrative']:,} rows") # ============================================================================= @@ -840,6 +1173,7 @@ if __name__ == "__main__": print(" Source : fact_asean_food_security_selected") print(" Dim : dim_indicator (unit)") print(" Output : agg_indicator_norm") + print(" agg_narrative_indicator") print("=" * 80) logger = setup_logging() diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index 8e698af..b8c3a32 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -177,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou def assign_pillar(indicator_name: str) -> str: """ Assign pillar berdasarkan keyword indikator. - Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting' + Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Sustainability' All ≤ 20 chars (varchar(20) constraint). """ if pd.isna(indicator_name): - return 'Supporting' + return 'Sustainability' ind = str(indicator_name).lower() for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: if kw in ind: - return 'Supporting' + return 'Sustainability' if any(kw in ind for kw in [ 'adequacy', 'protein supply', 'supply of protein', @@ -215,7 +215,7 @@ def assign_pillar(indicator_name: str) -> str: ]): return 'Utilization' - return 'Supporting' + return 'Sustainability' # ============================================================================= diff --git a/scripts/bigquery_dimensional_model.py b/scripts/bigquery_dimensional_model.py index ba8d14c..735f7a2 100644 --- a/scripts/bigquery_dimensional_model.py +++ b/scripts/bigquery_dimensional_model.py @@ -350,7 +350,7 @@ class DimensionalModelLoader: elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): return 'Infrastructure' else: - return 'Supporting' + return 'Sustainability' dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') @@ -471,10 +471,10 @@ class DimensionalModelLoader: try: pillar_codes = { 'Availability': 'AVL', 'Access' : 'ACC', - 'Utilization' : 'UTL', 'Stability': 'STB', 'Supporting': 'SPT', + 'Utilization' : 'UTL', 'Stability': 'STB', 'Sustainability': 'STN', } pillars_data = [ - {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'SPT')} + {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'STN')} for p in self.df_clean['pillar'].unique() ]