From f9d013f8e6c2e7a29f90d87c3228b4d9468d7c07 Mon Sep 17 00:00:00 2001 From: Debby Date: Wed, 22 Apr 2026 16:02:05 +0700 Subject: [PATCH] new narrative teks --- .../bigquery_aggraget_fact_selected_layer.py | 406 ++++++------------ scripts/bigquery_aggregate_layer.py | 264 +++--------- 2 files changed, 206 insertions(+), 464 deletions(-) diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index ecd8422..cf7ae4c 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -44,22 +44,24 @@ Output Schema (agg_indicator_norm): agg_narrative_indicator ============================================================================= Tujuan: - Menghasilkan narasi otomatis 1 paragraf per indikator per tahun di level ASEAN - (rata-rata seluruh negara ASEAN), dijalankan otomatis setelah agg_indicator_norm - selesai dalam pipeline yang sama. + Menghasilkan narasi otomatis 1 paragraf per indikator (level ASEAN, + merangkum seluruh periode + seluruh negara), dijalankan otomatis setelah + agg_indicator_norm selesai dalam pipeline yang sama. Granularity: - year x indicator_id (level ASEAN, bukan per negara) + indicator_id (all years, all ASEAN countries) Output Schema (agg_narrative_indicator): - year, indicator_id, indicator_name, unit, direction, + indicator_id, indicator_name, unit, direction, pillar_name, framework, - avg_value, -- rata-rata value ASEAN - avg_norm_score_1_100, -- rata-rata norm_score_1_100 ASEAN + year_min, year_max, n_countries, + avg_value_first, avg_value_last, + avg_norm_score_1_100, performance, -- Good | Bad | null - yoy_avg_value, -- perubahan avg_value vs tahun sebelumnya - n_countries, -- jumlah negara yang punya data tahun ini - narrative -- 1 paragraf narasi otomatis + n_yoy_total, n_yoy_positive, + best_yoy_from, best_yoy_to, + country_worst, country_best, + narrative """ import pandas as pd @@ -114,10 +116,8 @@ SDG_ONLY_KEYWORDS: frozenset = frozenset([ "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) -# Lowercase set untuk matching case-insensitive _SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS) -# FIES-specific keywords untuk deteksi sdgs_start_year _FIES_DETECTION_KEYWORDS: frozenset = frozenset([ "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", @@ -133,12 +133,11 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({ "positive", "higher_better", "higher_is_better", }) -# Threshold performance label _PERFORMANCE_THRESHOLD: float = 60.0 # ============================================================================= -# PURE HELPERS — agg_indicator_norm +# PURE HELPERS # ============================================================================= def _should_invert(direction: str, logger=None, context: str = "") -> bool: @@ -171,12 +170,8 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame: """ Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year. - - Kolom yang ditambahkan: - yoy_value : value - value_prev - yoy_norm_value : norm_value - norm_value_prev - - Baris pertama tiap grup selalu null (tidak ada tahun sebelumnya). + Kolom yang ditambahkan: yoy_value, yoy_norm_value. + Baris pertama tiap grup selalu null. """ df = df.sort_values("year").copy() @@ -199,70 +194,22 @@ def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame: return df -# ============================================================================= -# PURE HELPERS — agg_narrative_indicator -# ============================================================================= - def _is_lower_better(direction: str) -> bool: return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS -def _format_value(value: float, unit: str) -> str: - """Format nilai dengan unit yang sesuai.""" - if pd.isna(value): - return "N/A" - unit = str(unit).strip() if unit else "" - - if abs(value) >= 1000: - formatted = f"{value:,.1f}" - elif abs(value) >= 10: - formatted = f"{value:.2f}" - else: - formatted = f"{value:.3f}" - - return f"{formatted} {unit}".strip() - - -def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple: - """ - Kembalikan (direction_word, change_desc, is_positive_trend). - is_positive_trend: True jika perubahan menguntungkan sesuai direction. - """ - unit = str(unit).strip() if unit else "" - abs_yoy = abs(yoy) - - if abs_yoy >= 1000: - yoy_str = f"{abs_yoy:,.1f}" - elif abs_yoy >= 10: - yoy_str = f"{abs_yoy:.2f}" - else: - yoy_str = f"{abs_yoy:.3f}" - - change_desc = f"{yoy_str} {unit}".strip() - is_positive = (yoy < 0) if lower_better else (yoy > 0) - direction_word = "decreased by" if yoy < 0 else "increased by" - - return direction_word, change_desc, is_positive - # ============================================================================= -# PURE HELPER — narrative builder (per indicator, all years, all countries) -# ====================================================================== +# NARRATIVE BUILDER — agg_narrative_indicator +# ============================================================================= def _build_narrative_per_indicator(row: pd.Series) -> str: """ - Bangun 1 paragraf narasi ASEAN-level untuk satu indikator, - merangkum seluruh periode (year_min - year_max) dan seluruh negara. - - Kolom yang dibutuhkan dari row: - indicator_name, unit, direction, pillar_name, framework, - year_min, year_max, n_countries, - avg_value_first, avg_value_last, - avg_norm_score_1_100, -- rata-rata seluruh periode - performance, -- Good | Bad | null - n_yoy_total, -- total transisi year-on-year - n_yoy_positive, -- jumlah transisi yang membaik - best_yoy_from, best_yoy_to, -- periode dengan perbaikan terbesar - country_worst, country_best -- negara dengan nilai terburuk / terbaik + Narrative format (no em-dash, bold on key figures): + **{indicator}** ({framework}, {pillar}): ASEAN average {rose/fell} from + **{first}** to **{last}** ({year_min} to {year_max}), **{improving/deteriorating}** trend. + Score: **{score}/100** (*{Good/Bad}*). + Best country: **{best}**; worst: **{worst}**. + Improved in **{n_pos}/{n_total}** YoY transitions. """ ind_name = str(row["indicator_name"]).strip() unit = str(row["unit"]).strip() if row["unit"] else "" @@ -274,132 +221,81 @@ def _build_narrative_per_indicator(row: pd.Series) -> str: n_countries = int(row["n_countries"]) avg_score = row["avg_norm_score_1_100"] performance = row["performance"] - + avg_first = row["avg_value_first"] avg_last = row["avg_value_last"] - + n_yoy_total = int(row["n_yoy_total"]) if not pd.isna(row["n_yoy_total"]) else 0 n_yoy_positive = int(row["n_yoy_positive"]) if not pd.isna(row["n_yoy_positive"]) else 0 - + best_yoy_from = row["best_yoy_from"] best_yoy_to = row["best_yoy_to"] - + country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None - - lower_better = _is_lower_better(direction) - direction_label = ( - "lower values indicate better outcomes" - if lower_better - else "higher values indicate better outcomes" - ) - - # ---- Kalimat 1: Identifikasi indikator + cakupan ------------------------- - member_str = f"{n_countries} member state{'s' if n_countries > 1 else ''}" - sentence1 = ( - f"Across ASEAN, {ind_name} under the {framework} framework " - f"({pillar} pillar) was monitored from {year_min} to {year_max} " - f"across {member_str}." - ) - - # ---- Kalimat 2: Tren keseluruhan (first → last) -------------------------- + + lower_better = _is_lower_better(direction) + + def _fmt(v): + if pd.isna(v): + return "N/A" + abs_v = abs(v) + if abs_v >= 1000: + s = f"{v:,.1f}" + elif abs_v >= 10: + s = f"{v:.2f}" + else: + s = f"{v:.3f}" + return f"{s} {unit}".strip() if unit else s + + # Sentence 1: trend first -> last if not pd.isna(avg_first) and not pd.isna(avg_last): - diff = avg_last - avg_first - abs_diff = abs(diff) - - # Format nilai - def fmt(v): - if abs(v) >= 1000: - return f"{v:,.1f}" - elif abs(v) >= 10: - return f"{v:.2f}" - else: - return f"{v:.3f}" - - first_str = f"{fmt(avg_first)}{' ' + unit if unit else ''}" - last_str = f"{fmt(avg_last)}{' ' + unit if unit else ''}" - diff_str = f"{fmt(abs_diff)}{' ' + unit if unit else ''}" - - # Apakah tren menguntungkan? + diff = avg_last - avg_first is_improving = (diff < 0) if lower_better else (diff > 0) - trend_word = "improving" if is_improving else "deteriorating" - verb = "declining" if diff < 0 else "rising" - - sentence2 = ( - f"Since {direction_label}, the region collectively showed " - f"{'an' if trend_word[0] in 'aeiou' else 'a'} {trend_word} trend, " - f"with the ASEAN average {verb} from {first_str} in {year_min} " - f"to {last_str} in {year_max} " - f"(a cumulative {'reduction' if diff < 0 else 'increase'} of {diff_str})." + trend_label = "improving" if is_improving else "deteriorating" + verb = "fell" if diff < 0 else "rose" + sent1 = ( + f"**{ind_name}** ({framework}, {pillar}): ASEAN average {verb} from " + f"**{_fmt(avg_first)}** to **{_fmt(avg_last)}** ({year_min} to {year_max}), " + f"**{trend_label}** trend." ) else: - sentence2 = ( - f"Since {direction_label}, trend analysis could not be performed " - f"due to missing data at the start or end of the period." + sent1 = ( + f"**{ind_name}** ({framework}, {pillar}): trend data unavailable " + f"({year_min} to {year_max}, {n_countries} members)." ) - - # ---- Kalimat 3: Score + performance ------------------------------------- + + # Sentence 2: score + performance if not pd.isna(avg_score): - score_str = f"{avg_score:.1f} out of 100" - if performance == "Good": - sentence3 = ( - f"The regional normalized score averaged {score_str} " - f"classified as Good performance." - ) - elif performance == "Bad": - sentence3 = ( - f"The regional normalized score averaged {score_str} " - f"classified as Bad performance, falling below the 60-point threshold." - ) - else: - sentence3 = ( - f"The regional normalized score averaged {score_str}." - ) + perf_label = f"*{performance}*" if performance in ("Good", "Bad") else "" + sent2 = f"Score: **{avg_score:.1f}/100** {perf_label}.".strip() else: - sentence3 = "The regional normalized performance score could not be assessed." - - # ---- Kalimat 4: Negara terbaik & terburuk -------------------------------- - if country_worst and country_best and country_worst != country_best: - if lower_better: - worst_label = "highest (most concerning)" - best_label = "consistently performed best (lowest values)" - else: - worst_label = "lowest (most concerning)" - best_label = "consistently performed best (highest values)" - - sentence4 = ( - f"Among member states, {country_worst} recorded the {worst_label} " - f"levels throughout the period, while {country_best} {best_label}." - ) + sent2 = "Score unavailable." + + # Sentence 3: best / worst country + if country_best and country_worst and country_best != country_worst: + sent3 = f"Best country: **{country_best}**; worst: **{country_worst}**." elif country_best: - sentence4 = ( - f"Among member states, {country_best} consistently recorded the " - f"best performance throughout the period." - ) + sent3 = f"Best country: **{country_best}**." else: - sentence4 = "" - - # ---- Kalimat 5: YoY transitions ----------------------------------------- + sent3 = "" + + # Sentence 4: YoY transitions if n_yoy_total > 0: - yoy_sentence = ( - f"Year-on-year, the region improved in {n_yoy_positive} out of " - f"{n_yoy_total} transition{'s' if n_yoy_total > 1 else ''}" - ) + best_period = "" if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to): - yoy_sentence += ( - f", with the largest regional gain occurring between " - f"{int(best_yoy_from)} and {int(best_yoy_to)}." - ) - else: - yoy_sentence += "." + best_period = f", best gain: **{int(best_yoy_from)} to {int(best_yoy_to)}**" + sent4 = ( + f"Improved in **{n_yoy_positive}/{n_yoy_total}** YoY transitions{best_period}." + ) else: - yoy_sentence = "Insufficient data to assess year-on-year transitions." - - parts = [sentence1, sentence2, sentence3] - if sentence4: - parts.append(sentence4) - parts.append(yoy_sentence) - + sent4 = "Insufficient data for YoY assessment." + + parts = [sent1, sent2] + if sent3: + parts.append(sent3) + parts.append(sent4) + return " ".join(parts) @@ -427,10 +323,10 @@ class IndicatorNormAggregator: 11. Summary log agg_indicator_norm Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada): - 12. Agregasi ke level ASEAN (year x indicator_id) + 12. Agregasi ke level ASEAN per indicator_id 13. Hitung YoY avg_value per indikator 14. Assign performance berdasarkan avg_norm_score - 15. Build narrative 1 paragraf per baris + 15. Build narrative 1 paragraf per indikator 16. Simpan ke BigQuery -> agg_narrative_indicator 17. Summary log agg_narrative_indicator """ @@ -769,11 +665,6 @@ class IndicatorNormAggregator: # ========================================================================= def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame: - """ - performance = "Good" jika norm_score_1_100 >= 60 - = "Bad" jika norm_score_1_100 < 60 - = null jika norm_score_1_100 null - """ self.logger.info("\n" + "=" * 80) self.logger.info( f"STEP 9: ASSIGN PERFORMANCE LABEL " @@ -832,7 +723,6 @@ class IndicatorNormAggregator: ["year", "country_name", "pillar_name", "indicator_name"] ).reset_index(drop=True) - # Cast out["year"] = out["year"].astype(int) out["country_id"] = out["country_id"].astype(int) out["country_name"] = out["country_name"].astype(str) @@ -949,7 +839,6 @@ class IndicatorNormAggregator: f"{r['avg_score']:.2f}" ) - # Performance summary per framework self.logger.info("\n Performance summary per Framework:") perf_fw = ( df[df["performance"].notna()] @@ -967,7 +856,6 @@ class IndicatorNormAggregator: f"({r['count']/total*100:.1f}%)" ) - # Top 5 & Bottom 5 indikator ind_avg = ( df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"]) ["norm_score_1_100"].mean() @@ -995,7 +883,6 @@ class IndicatorNormAggregator: f"{r['norm_score_1_100']:.2f} {tag} {unit}" ) - # Indikator per pillar pillar_summary = ( df.drop_duplicates(subset=["indicator_id", "pillar_name"]) .groupby("pillar_name")["indicator_id"] @@ -1008,29 +895,24 @@ class IndicatorNormAggregator: self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}") # ========================================================================= - # STEP 12-16: agg_narrative_indicator (lanjutan dari df_final) + # STEP 12-16: agg_narrative_indicator # ========================================================================= def _build_narrative_table(self, df_final: pd.DataFrame): """ - Pipeline agg_narrative_indicator — granularity: per indicator_id (1 baris per indikator). - Narasi merangkum seluruh periode + seluruh negara ASEAN. - Dijalankan otomatis setelah agg_indicator_norm selesai. + Pipeline agg_narrative_indicator. + Granularity: per indicator_id (1 baris per indikator, all years, all countries). """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 12-16: agg_narrative_indicator") self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)") self.logger.info("=" * 80) - - # -- STEP 12: Hitung statistik agregat per (indicator_id, country_id, year) -- - self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---") - + df = df_final.copy() - - # Dimensi tetap per indikator dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] - - # ---- 12a. ASEAN avg per (indicator_id, year) -> untuk first/last & YoY --- + + # ---- 12a. ASEAN avg per (indicator_id, year) ------------------------- + self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---") df_yr = ( df.groupby(["indicator_id", "year"]) .agg( @@ -1040,41 +922,38 @@ class IndicatorNormAggregator: ) .reset_index() ) - - # ---- 12b. first year / last year avg value per indikator ----------------- + + # ---- 12b. first / last avg value per indikator ----------------------- df_first = ( df_yr.sort_values("year") - .groupby("indicator_id") - .first() - .reset_index()[["indicator_id", "year", "avg_value"]] + .groupby("indicator_id").first().reset_index() + [["indicator_id", "year", "avg_value"]] .rename(columns={"year": "year_min", "avg_value": "avg_value_first"}) ) df_last = ( df_yr.sort_values("year") - .groupby("indicator_id") - .last() - .reset_index()[["indicator_id", "year", "avg_value"]] + .groupby("indicator_id").last().reset_index() + [["indicator_id", "year", "avg_value"]] .rename(columns={"year": "year_max", "avg_value": "avg_value_last"}) ) - - # ---- 12c. Rata-rata norm_score seluruh periode ---------------------------- + + # ---- 12c. Rata-rata norm_score seluruh periode ----------------------- df_score_avg = ( df_yr.groupby("indicator_id") .agg(avg_norm_score_1_100=("avg_norm_score", "mean")) .reset_index() ) - - # ---- 12d. n_countries: maks negara yang pernah hadir --------------------- + + # ---- 12d. n_countries ------------------------------------------------ df_nc = ( df.groupby("indicator_id")["country_id"] - .nunique() - .reset_index() + .nunique().reset_index() .rename(columns={"country_id": "n_countries"}) ) - - # ---- 12e. YoY per (indicator_id) di level ASEAN avg ---------------------- + + # ---- 12e. YoY per indicator (ASEAN avg) ------------------------------ self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---") - + yoy_parts = [] for ind_id, grp in df_yr.groupby("indicator_id"): grp = grp.sort_values("year").copy() @@ -1087,44 +966,36 @@ class IndicatorNormAggregator: grp = grp.drop(columns=["prev_avg"]) yoy_parts.append(grp) df_yr = pd.concat(yoy_parts, ignore_index=True) - - # Ambil direction per indikator untuk tentukan "improving" + dir_map = ( df[["indicator_id", "direction"]] .drop_duplicates(subset=["indicator_id"]) .set_index("indicator_id")["direction"] .to_dict() ) - + def _is_positive_yoy(ind_id, yoy_val): - """True jika perubahan yoy menguntungkan sesuai direction.""" if pd.isna(yoy_val): return False lb = _is_lower_better(dir_map.get(ind_id, "positive")) return (yoy_val < 0) if lb else (yoy_val > 0) - - # Hitung n_yoy_total, n_yoy_positive, best_yoy + yoy_stats = [] for ind_id, grp in df_yr.groupby("indicator_id"): grp_yoy = grp[grp["yoy"].notna()].copy() lb = _is_lower_better(dir_map.get(ind_id, "positive")) - n_total = len(grp_yoy) n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"])) - - # "Best" = perubahan paling menguntungkan + if n_total > 0: - if lb: - idx_best = grp_yoy["yoy"].idxmin() # paling negatif = paling baik - else: - idx_best = grp_yoy["yoy"].idxmax() # paling positif = paling baik + idx_best = grp_yoy["yoy"].idxmin() if lb else grp_yoy["yoy"].idxmax() best_row = grp_yoy.loc[idx_best] best_yoy_from = best_row["year"] - 1 best_yoy_to = best_row["year"] else: best_yoy_from = np.nan best_yoy_to = np.nan - + yoy_stats.append({ "indicator_id" : ind_id, "n_yoy_total" : n_total, @@ -1132,16 +1003,16 @@ class IndicatorNormAggregator: "best_yoy_from" : best_yoy_from, "best_yoy_to" : best_yoy_to, }) - + df_yoy_stats = pd.DataFrame(yoy_stats) - - # ---- 12f. Country terbaik & terburuk (rata-rata value seluruh periode) --- + + # ---- 12f. Country terbaik & terburuk --------------------------------- df_country_avg = ( df.groupby(["indicator_id", "country_id", "country_name"]) .agg(country_avg_value=("value", "mean")) .reset_index() ) - + country_stats = [] for ind_id, grp in df_country_avg.groupby("indicator_id"): lb = _is_lower_better(dir_map.get(ind_id, "positive")) @@ -1152,33 +1023,33 @@ class IndicatorNormAggregator: worst_row = grp.loc[grp["country_avg_value"].idxmin()] best_row = grp.loc[grp["country_avg_value"].idxmax()] country_stats.append({ - "indicator_id": ind_id, + "indicator_id" : ind_id, "country_worst": worst_row["country_name"], "country_best" : best_row["country_name"], }) df_country_stats = pd.DataFrame(country_stats) - - # ---- 12g. Dimensi tetap per indikator ------------------------------------ + + # ---- 12g. Dimensi tetap per indikator -------------------------------- df_dim = ( df[["indicator_id"] + dim_cols] .drop_duplicates(subset=["indicator_id"]) ) - - # ---- 12h. Merge semua ------------------------------------------------------- + + # ---- 12h. Merge semua ------------------------------------------------ df_agg = ( df_dim - .merge(df_first, on="indicator_id", how="left") - .merge(df_last, on="indicator_id", how="left") - .merge(df_score_avg, on="indicator_id", how="left") - .merge(df_nc, on="indicator_id", how="left") - .merge(df_yoy_stats, on="indicator_id", how="left") - .merge(df_country_stats,on="indicator_id", how="left") + .merge(df_first, on="indicator_id", how="left") + .merge(df_last, on="indicator_id", how="left") + .merge(df_score_avg, on="indicator_id", how="left") + .merge(df_nc, on="indicator_id", how="left") + .merge(df_yoy_stats, on="indicator_id", how="left") + .merge(df_country_stats, on="indicator_id", how="left") ) - + self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}") self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}") - - # -- STEP 14: Assign performance -------------------------------------------- + + # -- STEP 14: Assign performance --------------------------------------- self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---") df_agg["performance"] = pd.NA has_score = df_agg["avg_norm_score_1_100"].notna() @@ -1187,8 +1058,8 @@ class IndicatorNormAggregator: n_good = (df_agg["performance"] == "Good").sum() n_bad = (df_agg["performance"] == "Bad").sum() self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}") - - # -- STEP 15: Build narrative ----------------------------------------------- + + # -- STEP 15: Build narrative ------------------------------------------ self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---") df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1) self.logger.info(f" Narratives generated: {len(df_agg):,}") @@ -1198,8 +1069,8 @@ class IndicatorNormAggregator: f"\n [{int(row['indicator_id'])}] {row['indicator_name'][:60]}" f"\n -> {row['narrative'][:300]}..." ) - - # -- STEP 16: Save ---------------------------------------------------------- + + # -- STEP 16: Save ----------------------------------------------------- self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---") out = df_agg[[ "indicator_id", "indicator_name", "unit", "direction", @@ -1212,10 +1083,9 @@ class IndicatorNormAggregator: "country_worst", "country_best", "narrative", ]].copy() - + out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True) - - # Cast + out["indicator_id"] = out["indicator_id"].astype(int) out["indicator_name"] = out["indicator_name"].astype(str) out["unit"] = out["unit"].fillna("").astype(str) @@ -1236,7 +1106,7 @@ class IndicatorNormAggregator: out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string") out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string") out["narrative"] = out["narrative"].astype(str) - + schema = [ bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), @@ -1259,17 +1129,17 @@ class IndicatorNormAggregator: bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"), bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), ] - + rows_loaded = load_to_bigquery( self.client, out, "agg_narrative_indicator", layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema, ) - + log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded) self.logger.info( f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold" ) - + metadata = { "source_class" : self.__class__.__name__, "table_name" : "agg_narrative_indicator", @@ -1293,9 +1163,8 @@ class IndicatorNormAggregator: } save_etl_metadata(self.client, metadata) self.logger.info(" Metadata -> [AUDIT] etl_metadata") - + self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded - # ========================================================================= # RUN @@ -1326,7 +1195,6 @@ class IndicatorNormAggregator: self.pipeline_metadata["rows_loaded"] = rows_loaded self._log_summary(df_final) - # Lanjut build agg_narrative_indicator dari df_final (tanpa re-load BQ) self._build_narrative_table(df_final) self.pipeline_metadata["end_time"] = datetime.now() @@ -1345,7 +1213,7 @@ class IndicatorNormAggregator: # ============================================================================= -# AIRFLOW TASK <-- tidak berubah +# AIRFLOW TASK # ============================================================================= def run_indicator_norm_aggregation(): diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index a78a1de..e2d38f9 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -219,113 +219,58 @@ def _build_overview_narrative( most_declined_country, most_declined_delta, ) -> str: - # Sentence 1: indicator breakdown - parts_ind = [] + """ + Narrative format (no em-dash): + In {year}, ASEAN scored {score} ({performance}) across {n_total} indicators + ({n_mdg} MDGs, {n_sdg} SDGs). Score {increased/decreased} by {delta} pts from + {prev_year} ({prev_score}). {top_country} led the region; {bottom_country} ranked + last. Biggest gain: {country}; biggest drop: {country}. + """ + + # Sentence 1: score + performance + indicators + ind_parts = [] if n_mdg > 0: - parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}") + ind_parts.append(f"**{n_mdg} MDGs**") if n_sdg > 0: - parts_ind.append(f"{n_sdg} SDG indicator{'s' if n_sdg > 1 else ''}") + ind_parts.append(f"**{n_sdg} SDGs**") + ind_detail = f" ({', '.join(ind_parts)})" if ind_parts else "" - if parts_ind: - ind_detail = " and ".join(parts_ind) - sent1 = ( - f"In {year}, the ASEAN food security assessment incorporated a total of " - f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}, " - f"consisting of {ind_detail}." - ) - else: - sent1 = ( - f"In {year}, the ASEAN food security assessment incorporated " - f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}." - ) - - # Sentence 2: score + performance status + YoY - status_phrase = ( - f"classified as \"{performance_status}\" performance " - f"(threshold: {PERFORMANCE_THRESHOLD:.0f})" + sent1 = ( + f"In **{year}**, ASEAN scored **{_fmt_score(score)}** (*{performance_status}*) " + f"across **{n_total_ind} indicators**{ind_detail}." ) + + # Sentence 2: YoY if yoy_val is not None and prev_score is not None: - direction_word = "increasing" if yoy_val >= 0 else "decreasing" - pct_clause = "" - if yoy_pct is not None: - abs_pct = abs(yoy_pct) - trend_word = "improvement" if yoy_val >= 0 else "decline" - pct_clause = f", representing a {abs_pct:.2f}% {trend_word} year-over-year" - - status_change = "" - if prev_performance_status not in ("N/A", None) and prev_performance_status != performance_status: - status_change = ( - f" This marks a shift from \"{prev_performance_status}\" in {prev_year} " - f"to \"{performance_status}\" in {year}." - ) - + direction_word = "increased" if yoy_val >= 0 else "decreased" sent2 = ( - f"The ASEAN overall score (Total framework) reached {_fmt_score(score)}, " - f"{status_phrase}, {direction_word} by {abs(yoy_val):.2f} points compared to " - f"{prev_year} ({_fmt_score(prev_score)}, \"{prev_performance_status}\"){pct_clause}.{status_change}" + f"Score {direction_word} by **{abs(yoy_val):.2f} pts** " + f"from {prev_year} ({_fmt_score(prev_score)}, *{prev_performance_status}*)." ) else: - sent2 = ( - f"The ASEAN overall score (Total framework) reached {_fmt_score(score)} in {year}, " - f"{status_phrase}. No prior-year data is available for year-over-year comparison." - ) + sent2 = "No prior-year data available for comparison." # Sentence 3: country ranking sent3 = "" if ranking_list: - first = ranking_list[0] - last = ranking_list[-1] - middle = ranking_list[1:-1] - + first = ranking_list[0] + last = ranking_list[-1] if len(ranking_list) == 1: - sent3 = ( - f"In terms of country performance, {first['country_name']} was the only " - f"country assessed, scoring {_fmt_score(first['score'])} in {year}." - ) - elif len(ranking_list) == 2: - sent3 = ( - f"In terms of country performance, {first['country_name']} led the region " - f"with a score of {_fmt_score(first['score'])}, while " - f"{last['country_name']} recorded the lowest score of " - f"{_fmt_score(last['score'])} in {year}." - ) + sent3 = f"**{first['country_name']}** was the only country assessed ({_fmt_score(first['score'])})." else: - middle_parts = [ - f"{c['country_name']} ({_fmt_score(c['score'])})" for c in middle - ] - if len(middle_parts) == 1: - middle_str = middle_parts[0] - else: - middle_str = ", ".join(middle_parts[:-1]) + f", and {middle_parts[-1]}" sent3 = ( - f"In terms of country performance, {first['country_name']} led the region " - f"with a score of {_fmt_score(first['score'])}, followed by {middle_str}. " - f"At the other end, {last['country_name']} recorded the lowest score " - f"of {_fmt_score(last['score'])} in {year}." + f"**{first['country_name']}** led the region ({_fmt_score(first['score'])}); " + f"**{last['country_name']}** ranked last ({_fmt_score(last['score'])})." ) - # Sentence 4: most improved / declined country + # Sentence 4: most improved / declined sent4_parts = [] if most_improved_country and most_improved_delta is not None: - sent4_parts.append( - f"the most notable improvement was seen in {most_improved_country}, " - f"which gained {_fmt_delta(most_improved_delta)} points from the previous year" - ) + sent4_parts.append(f"Biggest gain: **{most_improved_country}** ({_fmt_delta(most_improved_delta)} pts)") if most_declined_country and most_declined_delta is not None: - if most_declined_delta < 0: - sent4_parts.append( - f"while {most_declined_country} experienced the largest decline " - f"of {_fmt_delta(most_declined_delta)} points" - ) - else: - sent4_parts.append( - f"while {most_declined_country} recorded the smallest gain " - f"of {_fmt_delta(most_declined_delta)} points" - ) - - sent4 = "" - if sent4_parts: - sent4 = ", ".join(sent4_parts) + "." + sent4_parts.append(f"biggest drop: **{most_declined_country}** ({_fmt_delta(most_declined_delta)} pts)") + sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else "" + if sent4: sent4 = sent4[0].upper() + sent4[1:] return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) @@ -351,70 +296,55 @@ def _build_pillar_narrative( most_declined_pillar, most_declined_delta, ) -> str: + """ + Narrative format (no em-dash): + In {year}, {pillar} ranked {rank}/{n} with score {score}, {up/down} {delta} pts YoY. + Top country: {top_country}; bottom: {bot_country}. + Strongest pillar: {pillar}; weakest: {pillar}. + """ + rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") + + # Sentence 1: rank + score + YoY + if yoy_val is not None: + direction_word = "up" if yoy_val >= 0 else "down" + yoy_clause = f", {direction_word} **{abs(yoy_val):.2f} pts** YoY" + else: + yoy_clause = ", no prior-year data" + sent1 = ( - f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, " - f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN." + f"In **{year}**, **{pillar_name}** ranked **{rank_in_year}{rank_suffix}/{n_pillars}** " + f"with score **{_fmt_score(pillar_score)}**{yoy_clause}." ) + # Sentence 2: top / bottom country sent2 = "" - if strongest_pillar and weakest_pillar: - if strongest_pillar == pillar_name: - sent2 = ( - f"This made {pillar_name} the strongest performing pillar in {year}, " - f"compared to the weakest pillar, {weakest_pillar}, " - f"which scored {_fmt_score(weakest_score)}." - ) - elif weakest_pillar == pillar_name: - sent2 = ( - f"This made {pillar_name} the weakest performing pillar in {year}, " - f"compared to the strongest pillar, {strongest_pillar}, " - f"which scored {_fmt_score(strongest_score)}." - ) - else: - sent2 = ( - f"Across all pillars in {year}, {strongest_pillar} was the strongest " - f"(score: {_fmt_score(strongest_score)}), while {weakest_pillar} " - f"was the weakest (score: {_fmt_score(weakest_score)})." - ) - - sent3 = "" if top_country and bot_country: if top_country != bot_country: - sent3 = ( - f"Within the {pillar_name} pillar, {top_country} led with a score of " - f"{_fmt_score(top_country_score)}, while {bot_country} recorded the lowest " - f"score of {_fmt_score(bot_country_score)}." + sent2 = ( + f"Top country: **{top_country}** ({_fmt_score(top_country_score)}); " + f"bottom: **{bot_country}** ({_fmt_score(bot_country_score)})." ) else: - sent3 = ( - f"Within the {pillar_name} pillar, {top_country} was the only country " - f"with available data, scoring {_fmt_score(top_country_score)}." - ) + sent2 = f"**{top_country}** was the only country with data ({_fmt_score(top_country_score)})." - if yoy_val is not None: - direction_word = "improved" if yoy_val >= 0 else "declined" - sent4 = ( - f"Compared to the previous year, the {pillar_name} pillar " - f"{direction_word} by {abs(yoy_val):.2f} points" - ) - else: - sent4 = ( - f"No prior-year data is available to calculate year-over-year change " - f"for the {pillar_name} pillar in {year}" + # Sentence 3: strongest / weakest pillar + sent3 = "" + if strongest_pillar and weakest_pillar: + sent3 = ( + f"Strongest pillar: **{strongest_pillar}** ({_fmt_score(strongest_score)}); " + f"weakest: **{weakest_pillar}** ({_fmt_score(weakest_score)})." ) - if (most_improved_pillar and most_improved_delta is not None - and most_declined_pillar and most_declined_delta is not None - and most_improved_pillar != most_declined_pillar): - sent4 += ( - f". Across all pillars, {most_improved_pillar} showed the greatest improvement " - f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} " - f"recorded the largest decline ({_fmt_delta(most_declined_delta)} pts)" - ) - - sent4 += "." - sent4 = sent4[0].upper() + sent4[1:] + # Sentence 4: most improved / declined pillar + sent4_parts = [] + if most_improved_pillar and most_improved_delta is not None: + sent4_parts.append(f"Best gain: **{most_improved_pillar}** ({_fmt_delta(most_improved_delta)} pts)") + if most_declined_pillar and most_declined_delta is not None: + sent4_parts.append(f"largest drop: **{most_declined_pillar}** ({_fmt_delta(most_declined_delta)} pts)") + sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else "" + if sent4: + sent4 = sent4[0].upper() + sent4[1:] return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) @@ -610,10 +540,6 @@ class FoodSecurityAggregator: # ========================================================================= # METADATA BUILDER - # Menyesuaikan dengan signature: save_etl_metadata(client, metadata: dict) - # dan skema etl_metadata: source_class, table_name, execution_timestamp, - # duration_seconds, rows_fetched, rows_transformed, rows_loaded, - # completeness_pct, config_snapshot, validation_metrics # ========================================================================= def _build_etl_metadata( @@ -1419,50 +1345,7 @@ class FoodSecurityAggregator: status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}" self.logger.info(f" -> {status} (n_checked={len(check)})") - def _build_etl_metadata( - self, - table_name: str, - rows_loaded: int, - start_time: datetime, - end_time: datetime, - status: str, - error_msg: str = None, - ) -> dict: - """ - Susun dict metadata sesuai signature save_etl_metadata(client, metadata: dict) - dan kolom skema etl_metadata di bigquery_helpers.py: - source_class, table_name, execution_timestamp, duration_seconds, - rows_fetched, rows_transformed, rows_loaded, completeness_pct, - config_snapshot, validation_metrics - """ - duration = (end_time - start_time).total_seconds() if (start_time and end_time) else 0.0 - return { - "source_class" : "FoodSecurityAggregator", - "table_name" : table_name, - "execution_timestamp": start_time or end_time, - "duration_seconds" : round(duration, 4), - "rows_fetched" : rows_loaded, - "rows_transformed" : rows_loaded, - "rows_loaded" : rows_loaded, - "completeness_pct" : 100.0 if status == "success" else 0.0, - "config_snapshot" : json.dumps({ - "layer" : "gold", - "write_disposition" : "WRITE_TRUNCATE", - "normalize_frameworks_jointly": NORMALIZE_FRAMEWORKS_JOINTLY, - "performance_threshold" : PERFORMANCE_THRESHOLD, - "status" : status, - }), - "validation_metrics" : json.dumps({ - "status" : status, - "error_msg": error_msg or "", - }), - } - def _finalize(self, table_name: str, rows_loaded: int): - """ - Tandai tabel sukses. Catat ke etl_logs dan etl_metadata. - Pemanggilan: save_etl_metadata(client, metadata_dict) - """ end_time = datetime.now() start_time = self.load_metadata[table_name].get("start_time") @@ -1486,7 +1369,6 @@ class FoodSecurityAggregator: ) ) except Exception as meta_err: - # Error metadata tidak boleh menghentikan pipeline self.logger.warning( f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}" ) @@ -1494,10 +1376,6 @@ class FoodSecurityAggregator: self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") def _fail(self, table_name: str, error: Exception): - """ - Tandai tabel gagal. Catat ke etl_logs dan etl_metadata. - Pemanggilan: save_etl_metadata(client, metadata_dict) - """ end_time = datetime.now() start_time = self.load_metadata[table_name].get("start_time") error_msg = str(error) @@ -1579,10 +1457,6 @@ class FoodSecurityAggregator: # ============================================================================= def run_aggregation(): - """ - Airflow task: Hitung semua agregasi dari fact_asean_food_security_selected. - Dipanggil setelah analytical_layer_to_gold selesai. - """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() agg = FoodSecurityAggregator(client)