From 4bab7467792bae8b141fe93937242305f3dc810b Mon Sep 17 00:00:00 2001 From: Debby Date: Tue, 12 May 2026 09:55:15 +0700 Subject: [PATCH] create colomn indonesian text --- .../bigquery_aggraget_fact_selected_layer.py | 673 +++++++++-------- scripts/bigquery_aggregate_layer.py | 687 +++++++++++------- 2 files changed, 751 insertions(+), 609 deletions(-) diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index cf7ae4c..0376e5a 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -32,21 +32,22 @@ Output Schema (agg_indicator_norm): year, country_id, country_name, indicator_id, indicator_name, unit, direction, pillar_id, pillar_name, - framework, -- "MDGs" | "SDGs" - value, -- raw value asli - norm_value, -- 0-1, direction sudah diperhitungkan - norm_score_1_100, -- scaled 1-100 (global per indikator) - yoy_value, -- perubahan absolut value YoY - yoy_norm_value, -- perubahan absolut norm_value YoY - performance -- "Good" | "Bad" | null + framework, + value, + norm_value, + norm_score_1_100, + yoy_value, + yoy_norm_value, + performance ============================================================================= agg_narrative_indicator ============================================================================= Tujuan: - Menghasilkan narasi otomatis 1 paragraf per indikator (level ASEAN, - merangkum seluruh periode + seluruh negara), dijalankan otomatis setelah - agg_indicator_norm selesai dalam pipeline yang sama. + Menghasilkan narasi otomatis per indikator (granularity: indicator_id). + Narasi membaca kondisi nyata dari data: tren, gap, anomali, konsistensi. + Tersedia dalam dua bahasa: Inggris (narrative_en) dan Indonesia (narrative_id). + Tanpa markdown bold (**) agar aman ditampilkan di Looker Studio. Granularity: indicator_id (all years, all ASEAN countries) @@ -57,11 +58,12 @@ Output Schema (agg_narrative_indicator): year_min, year_max, n_countries, avg_value_first, avg_value_last, avg_norm_score_1_100, - performance, -- Good | Bad | null + performance, n_yoy_total, n_yoy_positive, best_yoy_from, best_yoy_to, country_worst, country_best, - narrative + narrative_en, + narrative_id """ import pandas as pd @@ -86,10 +88,8 @@ from google.cloud import bigquery # ============================================================================= SDG_ONLY_KEYWORDS: frozenset = frozenset([ - # TARGET 2.1.1 - Undernourishment "prevalence of undernourishment (percent) (3-year average)", "number of people undernourished (million) (3-year average)", - # TARGET 2.1.2 - Food Insecurity (FIES) "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", @@ -102,16 +102,12 @@ SDG_ONLY_KEYWORDS: frozenset = frozenset([ "number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)", - # TARGET 2.2.1 - Stunting "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", "number of children under 5 years of age who are stunted (modeled estimates) (million)", - # TARGET 2.2.2 - Wasting "percentage of children under 5 years affected by wasting (percent)", "number of children under 5 years affected by wasting (million)", - # TARGET 2.2.2 - Overweight (children) "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", "number of children under 5 years of age who are overweight (modeled estimates) (million)", - # TARGET 2.2.3 - Anaemia "prevalence of anemia among women of reproductive age (15-49 years) (percent)", "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) @@ -168,28 +164,19 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame: - """ - Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year. - Kolom yang ditambahkan: yoy_value, yoy_norm_value. - Baris pertama tiap grup selalu null. - """ df = df.sort_values("year").copy() - df["value_prev"] = df["value"].shift(1) df["norm_value_prev"] = df["norm_value"].shift(1) - df["yoy_value"] = np.where( df["value"].notna() & df["value_prev"].notna(), df["value"] - df["value_prev"], np.nan, ) - df["yoy_norm_value"] = np.where( df["norm_value"].notna() & df["norm_value_prev"].notna(), df["norm_value"] - df["norm_value_prev"], np.nan, ) - df = df.drop(columns=["value_prev", "norm_value_prev"]) return df @@ -199,18 +186,163 @@ def _is_lower_better(direction: str) -> bool: # ============================================================================= -# NARRATIVE BUILDER — agg_narrative_indicator +# NARRATIVE CONDITION DETECTORS # ============================================================================= -def _build_narrative_per_indicator(row: pd.Series) -> str: +def _detect_trend(scores_by_year: pd.Series, lower_better: bool) -> str: """ - Narrative format (no em-dash, bold on key figures): - **{indicator}** ({framework}, {pillar}): ASEAN average {rose/fell} from - **{first}** to **{last}** ({year_min} to {year_max}), **{improving/deteriorating}** trend. - Score: **{score}/100** (*{Good/Bad}*). - Best country: **{best}**; worst: **{worst}**. - Improved in **{n_pos}/{n_total}** YoY transitions. + Deteksi tren: improving_consistent, improving_slowing, fluctuating, deteriorating. + scores_by_year: Series dengan index=year, value=avg_score (sudah direction-aware). """ + if len(scores_by_year) < 3: + return "insufficient_data" + + years = sorted(scores_by_year.index) + vals = [scores_by_year[y] for y in years if not pd.isna(scores_by_year.get(y, np.nan))] + + if len(vals) < 3: + return "insufficient_data" + + # Hitung slope keseluruhan + x = np.arange(len(vals)) + slope = np.polyfit(x, vals, 1)[0] + + # Slope positif = skor naik = baik untuk higher_better, buruk untuk lower_better + improving = (slope > 0 and not lower_better) or (slope < 0 and lower_better) + + # Hitung apakah laju melambat: bandingkan slope paruh pertama vs paruh kedua + mid = len(vals) // 2 + first_half = vals[:mid] + second_half = vals[mid:] + slope1 = np.polyfit(np.arange(len(first_half)), first_half, 1)[0] if len(first_half) > 1 else 0 + slope2 = np.polyfit(np.arange(len(second_half)), second_half, 1)[0] if len(second_half) > 1 else 0 + + # Koefisien variasi untuk cek fluktuasi + cv = np.std(vals) / (np.mean(vals) + 1e-9) + + if cv > 0.25: + return "fluctuating" + + if improving: + # Cek apakah melambat + if lower_better: + slowing = slope2 > slope1 # slope negatif mengecil artinya melambat + else: + slowing = slope2 < slope1 # slope positif mengecil artinya melambat + return "improving_slowing" if slowing else "improving_consistent" + else: + return "deteriorating" + + +def _detect_gap_trend(df_ind: pd.DataFrame, lower_better: bool) -> str: + """ + Deteksi apakah gap antar negara melebar, menyempit, atau stabil. + df_ind: rows untuk 1 indikator, kolom: year, country_id, value + """ + std_by_year = ( + df_ind.groupby("year")["value"] + .std() + .dropna() + ) + if len(std_by_year) < 3: + return "unknown" + + years = sorted(std_by_year.index) + stds = [std_by_year[y] for y in years] + slope = np.polyfit(np.arange(len(stds)), stds, 1)[0] + + if abs(slope) < 0.01 * np.mean(stds): + return "stable" + return "widening" if slope > 0 else "narrowing" + + +def _detect_anomaly_year(scores_by_year: pd.Series) -> tuple: + """ + Deteksi tahun dengan perubahan paling ekstrem (naik atau turun tajam). + Return: (anomaly_year, direction) atau (None, None) + """ + if len(scores_by_year) < 3: + return None, None + + years = sorted(scores_by_year.index) + deltas = {} + for i in range(1, len(years)): + y_prev = years[i - 1] + y_curr = years[i] + v_prev = scores_by_year.get(y_prev, np.nan) + v_curr = scores_by_year.get(y_curr, np.nan) + if not pd.isna(v_prev) and not pd.isna(v_curr): + deltas[y_curr] = v_curr - v_prev + + if not deltas: + return None, None + + max_drop_year = min(deltas, key=deltas.get) + max_rise_year = max(deltas, key=deltas.get) + + threshold = 1.5 * np.std(list(deltas.values())) + if abs(deltas[max_drop_year]) > threshold and deltas[max_drop_year] < 0: + return max_drop_year, "drop" + if abs(deltas[max_rise_year]) > threshold and deltas[max_rise_year] > 0: + return max_rise_year, "rise" + + return None, None + + +def _detect_consistency(df_ind: pd.DataFrame, lower_better: bool) -> tuple: + """ + Cari negara yang paling konsisten terbaik dan terburuk. + Return: (consistent_best, consistent_worst, is_consistent) + """ + country_avg = ( + df_ind.groupby("country_name")["value"] + .mean() + .dropna() + ) + if country_avg.empty: + return None, None, False + + if lower_better: + best = country_avg.idxmin() + worst = country_avg.idxmax() + else: + best = country_avg.idxmax() + worst = country_avg.idxmin() + + # Cek konsistensi: apakah negara terbaik selalu di atas rata-rata? + asean_avg_by_year = df_ind.groupby("year")["value"].mean() + country_by_year = df_ind[df_ind["country_name"] == best].set_index("year")["value"] + + years_both = set(asean_avg_by_year.index) & set(country_by_year.index) + if not years_both: + return best, worst, False + + if lower_better: + consistent = all( + country_by_year[y] <= asean_avg_by_year[y] + for y in years_both + if not pd.isna(country_by_year.get(y, np.nan)) + ) + else: + consistent = all( + country_by_year[y] >= asean_avg_by_year[y] + for y in years_both + if not pd.isna(country_by_year.get(y, np.nan)) + ) + + return best, worst, consistent + + +# ============================================================================= +# NARRATIVE BUILDER — plain text, no markdown, bilingual +# ============================================================================= + +def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tuple: + """ + Bangun narasi interpretatif per indikator berdasarkan kondisi nyata data. + Return: (narrative_en, narrative_id) — plain text tanpa markdown bold. + """ + ind_id = int(row["indicator_id"]) ind_name = str(row["indicator_name"]).strip() unit = str(row["unit"]).strip() if row["unit"] else "" direction = str(row["direction"]).strip() @@ -218,85 +350,109 @@ def _build_narrative_per_indicator(row: pd.Series) -> str: framework = str(row["framework"]).strip() year_min = int(row["year_min"]) year_max = int(row["year_max"]) - n_countries = int(row["n_countries"]) - avg_score = row["avg_norm_score_1_100"] - performance = row["performance"] - - avg_first = row["avg_value_first"] - avg_last = row["avg_value_last"] - - n_yoy_total = int(row["n_yoy_total"]) if not pd.isna(row["n_yoy_total"]) else 0 - n_yoy_positive = int(row["n_yoy_positive"]) if not pd.isna(row["n_yoy_positive"]) else 0 - - best_yoy_from = row["best_yoy_from"] - best_yoy_to = row["best_yoy_to"] - - country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None - country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None - lower_better = _is_lower_better(direction) - def _fmt(v): + # Subset data untuk indikator ini + df_ind = df_full[df_full["indicator_id"] == ind_id].copy() + + if df_ind.empty: + na_en = f"{ind_name} ({framework}, {pillar}): Insufficient data for analysis." + na_id = f"{ind_name} ({framework}, {pillar}): Data tidak cukup untuk dianalisis." + return na_en, na_id + + # ---- Hitung kondisi dari data ---- + asean_avg_by_year = ( + df_ind.groupby("year")["value"].mean().dropna() + ) + + trend_label = _detect_trend(asean_avg_by_year, lower_better) + gap_label = _detect_gap_trend(df_ind, lower_better) + anomaly_year, anomaly_dir = _detect_anomaly_year(asean_avg_by_year) + best_country, worst_country, is_consistent = _detect_consistency(df_ind, lower_better) + + avg_first = row.get("avg_value_first", np.nan) + avg_last = row.get("avg_value_last", np.nan) + + def fmt(v): if pd.isna(v): return "N/A" abs_v = abs(v) - if abs_v >= 1000: - s = f"{v:,.1f}" - elif abs_v >= 10: - s = f"{v:.2f}" - else: - s = f"{v:.3f}" + s = f"{v:,.1f}" if abs_v >= 1000 else (f"{v:.2f}" if abs_v >= 10 else f"{v:.3f}") return f"{s} {unit}".strip() if unit else s - # Sentence 1: trend first -> last - if not pd.isna(avg_first) and not pd.isna(avg_last): - diff = avg_last - avg_first - is_improving = (diff < 0) if lower_better else (diff > 0) - trend_label = "improving" if is_improving else "deteriorating" - verb = "fell" if diff < 0 else "rose" - sent1 = ( - f"**{ind_name}** ({framework}, {pillar}): ASEAN average {verb} from " - f"**{_fmt(avg_first)}** to **{_fmt(avg_last)}** ({year_min} to {year_max}), " - f"**{trend_label}** trend." - ) - else: - sent1 = ( - f"**{ind_name}** ({framework}, {pillar}): trend data unavailable " - f"({year_min} to {year_max}, {n_countries} members)." - ) + # ---- Bangun kalimat EN ---- + sentences_en = [] + sentences_id = [] - # Sentence 2: score + performance - if not pd.isna(avg_score): - perf_label = f"*{performance}*" if performance in ("Good", "Bad") else "" - sent2 = f"Score: **{avg_score:.1f}/100** {perf_label}.".strip() - else: - sent2 = "Score unavailable." + # Kalimat 1: konteks indikator + s1_en = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):" + s1_id = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):" + sentences_en.append(s1_en) + sentences_id.append(s1_id) - # Sentence 3: best / worst country - if country_best and country_worst and country_best != country_worst: - sent3 = f"Best country: **{country_best}**; worst: **{country_worst}**." - elif country_best: - sent3 = f"Best country: **{country_best}**." - else: - sent3 = "" + # Kalimat 2: tren keseluruhan + trend_map_en = { + "improving_consistent": f"Regional average improved consistently from {fmt(avg_first)} to {fmt(avg_last)}.", + "improving_slowing": f"Regional average improved from {fmt(avg_first)} to {fmt(avg_last)}, though the pace slowed in recent years.", + "deteriorating": f"Regional average worsened from {fmt(avg_first)} to {fmt(avg_last)} over the period.", + "fluctuating": f"Regional average fluctuated between {fmt(avg_first)} and {fmt(avg_last)} with no clear trend.", + "insufficient_data": f"Trend analysis is limited due to sparse data.", + } + trend_map_id = { + "improving_consistent": f"Rata-rata regional membaik secara konsisten dari {fmt(avg_first)} menjadi {fmt(avg_last)}.", + "improving_slowing": f"Rata-rata regional membaik dari {fmt(avg_first)} menjadi {fmt(avg_last)}, namun lajunya melambat dalam beberapa tahun terakhir.", + "deteriorating": f"Rata-rata regional memburuk dari {fmt(avg_first)} menjadi {fmt(avg_last)} sepanjang periode.", + "fluctuating": f"Rata-rata regional berfluktuasi antara {fmt(avg_first)} dan {fmt(avg_last)} tanpa tren yang jelas.", + "insufficient_data": f"Analisis tren terbatas karena data yang tersedia tidak cukup.", + } + sentences_en.append(trend_map_en.get(trend_label, "")) + sentences_id.append(trend_map_id.get(trend_label, "")) - # Sentence 4: YoY transitions - if n_yoy_total > 0: - best_period = "" - if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to): - best_period = f", best gain: **{int(best_yoy_from)} to {int(best_yoy_to)}**" - sent4 = ( - f"Improved in **{n_yoy_positive}/{n_yoy_total}** YoY transitions{best_period}." - ) - else: - sent4 = "Insufficient data for YoY assessment." + # Kalimat 3: gap antar negara + if gap_label == "widening": + sentences_en.append("Disparity among ASEAN countries has widened over time, indicating unequal progress.") + sentences_id.append("Kesenjangan antar negara ASEAN melebar seiring waktu, menunjukkan kemajuan yang tidak merata.") + elif gap_label == "narrowing": + sentences_en.append("Disparity among ASEAN countries has narrowed, suggesting more balanced regional progress.") + sentences_id.append("Kesenjangan antar negara ASEAN menyempit, mengindikasikan kemajuan regional yang lebih merata.") + elif gap_label == "stable": + sentences_en.append("The gap among ASEAN countries remained relatively stable throughout the period.") + sentences_id.append("Kesenjangan antar negara ASEAN relatif stabil sepanjang periode.") - parts = [sent1, sent2] - if sent3: - parts.append(sent3) - parts.append(sent4) + # Kalimat 4: anomali + if anomaly_year is not None: + if anomaly_dir == "drop": + sentences_en.append(f"A notable decline was recorded in {anomaly_year}, which stood out from the overall pattern.") + sentences_id.append(f"Penurunan signifikan tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.") + elif anomaly_dir == "rise": + sentences_en.append(f"A sharp improvement was observed in {anomaly_year}, standing out from the overall pattern.") + sentences_id.append(f"Peningkatan tajam tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.") - return " ".join(parts) + # Kalimat 5: konsistensi negara terbaik/terburuk + if best_country and worst_country: + if is_consistent: + sentences_en.append( + f"{best_country} consistently performed above the regional average, " + f"while {worst_country} consistently lagged behind." + ) + sentences_id.append( + f"{best_country} secara konsisten berada di atas rata-rata regional, " + f"sementara {worst_country} secara konsisten tertinggal." + ) + else: + sentences_en.append( + f"Overall, {best_country} showed the best performance, " + f"while {worst_country} had the weakest results across the period." + ) + sentences_id.append( + f"Secara keseluruhan, {best_country} menunjukkan performa terbaik, " + f"sementara {worst_country} memiliki hasil terlemah sepanjang periode." + ) + + narrative_en = " ".join(s for s in sentences_en if s) + narrative_id = " ".join(s for s in sentences_id if s) + + return narrative_en, narrative_id # ============================================================================= @@ -308,27 +464,6 @@ class IndicatorNormAggregator: Hitung norm_value per indikator untuk seluruh data di fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm. Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator. - - Alur agg_indicator_norm: - 1. Load fact_asean_food_security_selected - 2. Load dim_indicator -> ambil kolom unit - 3. Merge unit ke df - 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) - 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label - 6. Hitung norm_value per indikator (direction-aware, 0-1) - 7. Hitung YoY per (indicator_id, country_id) - 8. Scale ke 1-100 per indikator (global) - 9. Assign performance label (Good/Bad) - 10. Simpan ke BigQuery -> agg_indicator_norm - 11. Summary log agg_indicator_norm - - Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada): - 12. Agregasi ke level ASEAN per indicator_id - 13. Hitung YoY avg_value per indikator - 14. Assign performance berdasarkan avg_norm_score - 15. Build narrative 1 paragraf per indikator - 16. Simpan ke BigQuery -> agg_narrative_indicator - 17. Summary log agg_narrative_indicator """ def __init__(self, client: bigquery.Client): @@ -432,10 +567,10 @@ class IndicatorNormAggregator: self.logger.info("STEP 3: MERGE UNIT -> fact df") self.logger.info("=" * 80) - before = len(self.df) - self.df = self.df.merge(self.df_unit, on="indicator_id", how="left") + before = len(self.df) + self.df = self.df.merge(self.df_unit, on="indicator_id", how="left") self.df["unit"] = self.df["unit"].fillna("").astype(str) - after = len(self.df) + after = len(self.df) assert before == after, ( f"Row count berubah setelah merge unit: {before} -> {after}" @@ -445,11 +580,6 @@ class IndicatorNormAggregator: self.logger.info( f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}" ) - unique_units = self.df["unit"].value_counts().to_dict() - self.logger.info(" Distribusi unit (top 10):") - for u, cnt in list(unique_units.items())[:10]: - label = repr(u) if u == "" else u - self.logger.info(f" {label:<30}: {cnt:,} rows") # ========================================================================= # STEP 4: Deteksi sdgs_start_year @@ -465,12 +595,7 @@ class IndicatorNormAggregator: ] if not fies_rows.empty: sdgs_start = int(fies_rows["year"].min()) - n_fies_ind = fies_rows["indicator_name"].nunique() self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}") - self.logger.info(f" FIES indicators found: {n_fies_ind}, first year = {sdgs_start}") - for nm in fies_rows["indicator_name"].unique(): - min_y = int(fies_rows[fies_rows["indicator_name"] == nm]["year"].min()) - self.logger.info(f" - {nm[:60]} (first year: {min_y})") return sdgs_start self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar") @@ -480,7 +605,6 @@ class IndicatorNormAggregator: .rename(columns={"year": "min_year"}) ) unique_years = sorted(ind_min_year["min_year"].unique()) - self.logger.info(f" Unique min_year per indikator: {unique_years}") if len(unique_years) == 1: sdgs_start = int(unique_years[0]) + 9999 @@ -510,13 +634,11 @@ class IndicatorNormAggregator: self.logger.info("=" * 80) df = self.df.copy() - df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER) df["framework"] = "MDGs" mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year) df.loc[mask_sdgs, "framework"] = "SDGs" - df = df.drop(columns=["_is_sdg_kw"]) fw_dist = df["framework"].value_counts() @@ -524,32 +646,10 @@ class IndicatorNormAggregator: for fw, cnt in fw_dist.items(): self.logger.info(f" {fw:<6}: {cnt:,} rows") - dual = ( - df.groupby("indicator_id")["framework"] - .nunique() - .reset_index() - .rename(columns={"framework": "n_frameworks"}) - ) - dual_ids = dual[dual["n_frameworks"] > 1]["indicator_id"].tolist() - self.logger.info( - f"\n Indikator dengan DUAL framework (MDGs + SDGs): {len(dual_ids)}" - ) - if dual_ids: - for iid in dual_ids: - ind_name = df[df["indicator_id"] == iid]["indicator_name"].iloc[0] - yr_range = df[df["indicator_id"] == iid][["year", "framework"]].drop_duplicates() - mdgs_yrs = sorted(yr_range[yr_range["framework"] == "MDGs"]["year"].tolist()) - sdgs_yrs = sorted(yr_range[yr_range["framework"] == "SDGs"]["year"].tolist()) - self.logger.info( - f" [{iid}] {ind_name[:55]}\n" - f" MDGs years: {mdgs_yrs}\n" - f" SDGs years: {sdgs_yrs}" - ) - self.df = df # ========================================================================= - # STEP 6: Hitung norm_value per indikator (direction-aware) + # STEP 6: Hitung norm_value per indikator # ========================================================================= def _compute_norm_values(self) -> pd.DataFrame: @@ -595,17 +695,11 @@ class IndicatorNormAggregator: norm_parts.append(grp) df_normed = pd.concat(norm_parts, ignore_index=True) - self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators") - self.logger.info( - f" norm_value range : " - f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}" - ) - self.logger.info(f" norm_value nulls : {df_normed['norm_value'].isna().sum()}") return df_normed # ========================================================================= - # STEP 7: Hitung YoY per (indicator_id, country_id) + # STEP 7: Hitung YoY # ========================================================================= def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame: @@ -621,21 +715,8 @@ class IndicatorNormAggregator: parts.append(_compute_yoy(grp)) df_out = pd.concat(parts, ignore_index=True) - - self.logger.info( - f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}" - ) - self.logger.info( - f" yoy_value range : " - f"{df_out['yoy_value'].min():.4f} - {df_out['yoy_value'].max():.4f}" - ) - self.logger.info( - f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}" - ) - self.logger.info( - f" yoy_norm_value range: " - f"{df_out['yoy_norm_value'].min():.4f} - {df_out['yoy_norm_value'].max():.4f}" - ) + self.logger.info(f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}") + self.logger.info(f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}") return df_out # ========================================================================= @@ -690,7 +771,7 @@ class IndicatorNormAggregator: return df # ========================================================================= - # STEP 10: Save agg_indicator_norm to BigQuery + # STEP 10: Save agg_indicator_norm # ========================================================================= def _save(self, df: pd.DataFrame) -> int: @@ -701,22 +782,11 @@ class IndicatorNormAggregator: self.logger.info("=" * 80) out = df[[ - "year", - "country_id", - "country_name", - "indicator_id", - "indicator_name", - "unit", - "direction", - "pillar_id", - "pillar_name", - "framework", - "value", - "norm_value", - "norm_score_1_100", - "yoy_value", - "yoy_norm_value", - "performance", + "year", "country_id", "country_name", + "indicator_id", "indicator_name", "unit", "direction", + "pillar_id", "pillar_name", "framework", + "value", "norm_value", "norm_score_1_100", + "yoy_value", "yoy_norm_value", "performance", ]].copy() out = out.sort_values( @@ -740,13 +810,10 @@ class IndicatorNormAggregator: out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float) out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") - self.logger.info(f" Columns : {list(out.columns)}") self.logger.info(f" Total rows : {len(out):,}") self.logger.info(f" Countries : {out['country_id'].nunique()}") self.logger.info(f" Indicators : {out['indicator_id'].nunique()}") self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}") - self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}") - self.logger.info(f" Performance: {dict(out['performance'].value_counts())}") schema = [ bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), @@ -793,11 +860,6 @@ class IndicatorNormAggregator: "yoy_columns" : ["yoy_value", "yoy_norm_value"], "performance_threshold": _PERFORMANCE_THRESHOLD, "unit_source" : "dim_indicator", - "framework_logic" : ( - "SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, " - "SDGs if year >= sdgs_start_year. " - "Non-SDG_ONLY: always MDGs." - ), }), "validation_metrics" : json.dumps({ "total_rows" : rows_loaded, @@ -807,11 +869,10 @@ class IndicatorNormAggregator: }), } save_etl_metadata(self.client, metadata) - self.logger.info(" Metadata -> [AUDIT] etl_metadata") return rows_loaded # ========================================================================= - # STEP 11: Summary log agg_indicator_norm + # STEP 11: Summary log # ========================================================================= def _log_summary(self, df: pd.DataFrame): @@ -819,140 +880,81 @@ class IndicatorNormAggregator: self.logger.info("STEP 11: SUMMARY — agg_indicator_norm") self.logger.info("=" * 80) - summary = ( - df.groupby(["framework", "year"]) - .agg( - n_indicators=("indicator_id", "nunique"), - n_countries =("country_id", "nunique"), - avg_score =("norm_score_1_100", "mean"), - ) - .reset_index() - ) - self.logger.info( - f"\n{'Framework':<8} {'Year':<6} {'Indicators':<12} {'Countries':<12} {'Avg Score'}" - ) - self.logger.info("-" * 55) - for _, r in summary.iterrows(): - self.logger.info( - f"{r['framework']:<8} {int(r['year']):<6} " - f"{int(r['n_indicators']):<12} {int(r['n_countries']):<12} " - f"{r['avg_score']:.2f}" - ) - - self.logger.info("\n Performance summary per Framework:") - perf_fw = ( - df[df["performance"].notna()] - .groupby(["framework", "performance"]) - .size() - .reset_index(name="count") - ) - for fw in perf_fw["framework"].unique(): - sub = perf_fw[perf_fw["framework"] == fw] - total = sub["count"].sum() - self.logger.info(f" [{fw}]") - for _, r in sub.iterrows(): - self.logger.info( - f" {r['performance']:<6}: {int(r['count']):,} " - f"({r['count']/total*100:.1f}%)" - ) - ind_avg = ( - df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"]) + df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"]) ["norm_score_1_100"].mean() .reset_index() .sort_values("norm_score_1_100", ascending=False) ) - self.logger.info( - "\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):" - ) + self.logger.info("\n TOP 5 Indicators (avg norm_score_1_100):") for _, r in ind_avg.head(5).iterrows(): - tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" - unit = f"[{r['unit']}]" if r["unit"] else "" + tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" self.logger.info( f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} " - f"{r['norm_score_1_100']:.2f} {tag} {unit}" + f"{r['norm_score_1_100']:.2f} {tag}" ) self.logger.info("\n BOTTOM 5 Indicators:") for _, r in ind_avg.tail(5).iterrows(): - tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" - unit = f"[{r['unit']}]" if r["unit"] else "" + tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" self.logger.info( f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} " - f"{r['norm_score_1_100']:.2f} {tag} {unit}" + f"{r['norm_score_1_100']:.2f} {tag}" ) - pillar_summary = ( - df.drop_duplicates(subset=["indicator_id", "pillar_name"]) - .groupby("pillar_name")["indicator_id"] - .count() - .reset_index() - .rename(columns={"indicator_id": "n_indicators"}) - ) - self.logger.info("\n Indicators per pillar:") - for _, r in pillar_summary.iterrows(): - self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}") - # ========================================================================= - # STEP 12-16: agg_narrative_indicator + # STEP 12-17: agg_narrative_indicator # ========================================================================= def _build_narrative_table(self, df_final: pd.DataFrame): - """ - Pipeline agg_narrative_indicator. - Granularity: per indicator_id (1 baris per indikator, all years, all countries). - """ self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 12-16: agg_narrative_indicator") - self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)") + self.logger.info("STEP 12-17: agg_narrative_indicator") + self.logger.info(" Granularity: per indicator_id (all years + all ASEAN countries)") + self.logger.info(" Narrative : interpretatif, plain text, bilingual EN/ID") self.logger.info("=" * 80) df = df_final.copy() - dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] - # ---- 12a. ASEAN avg per (indicator_id, year) ------------------------- - self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---") + # ---- Agregasi statistik per indikator ---- df_yr = ( df.groupby(["indicator_id", "year"]) .agg( - avg_value =("value", "mean"), - avg_norm_score =("norm_score_1_100", "mean"), - n_countries_year =("country_id", "nunique"), + avg_value =("value", "mean"), + avg_norm_score =("norm_score_1_100", "mean"), + n_countries_yr =("country_id", "nunique"), ) .reset_index() ) - # ---- 12b. first / last avg value per indikator ----------------------- df_first = ( - df_yr.sort_values("year") - .groupby("indicator_id").first().reset_index() + df_yr.sort_values("year").groupby("indicator_id").first().reset_index() [["indicator_id", "year", "avg_value"]] .rename(columns={"year": "year_min", "avg_value": "avg_value_first"}) ) df_last = ( - df_yr.sort_values("year") - .groupby("indicator_id").last().reset_index() + df_yr.sort_values("year").groupby("indicator_id").last().reset_index() [["indicator_id", "year", "avg_value"]] .rename(columns={"year": "year_max", "avg_value": "avg_value_last"}) ) - - # ---- 12c. Rata-rata norm_score seluruh periode ----------------------- df_score_avg = ( df_yr.groupby("indicator_id") .agg(avg_norm_score_1_100=("avg_norm_score", "mean")) .reset_index() ) - - # ---- 12d. n_countries ------------------------------------------------ df_nc = ( df.groupby("indicator_id")["country_id"] .nunique().reset_index() .rename(columns={"country_id": "n_countries"}) ) - # ---- 12e. YoY per indicator (ASEAN avg) ------------------------------ - self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---") + # YoY stats + dir_map = ( + df[["indicator_id", "direction"]] + .drop_duplicates(subset=["indicator_id"]) + .set_index("indicator_id")["direction"] + .to_dict() + ) yoy_parts = [] for ind_id, grp in df_yr.groupby("indicator_id"): @@ -967,13 +969,6 @@ class IndicatorNormAggregator: yoy_parts.append(grp) df_yr = pd.concat(yoy_parts, ignore_index=True) - dir_map = ( - df[["indicator_id", "direction"]] - .drop_duplicates(subset=["indicator_id"]) - .set_index("indicator_id")["direction"] - .to_dict() - ) - def _is_positive_yoy(ind_id, yoy_val): if pd.isna(yoy_val): return False @@ -982,8 +977,8 @@ class IndicatorNormAggregator: yoy_stats = [] for ind_id, grp in df_yr.groupby("indicator_id"): - grp_yoy = grp[grp["yoy"].notna()].copy() - lb = _is_lower_better(dir_map.get(ind_id, "positive")) + grp_yoy = grp[grp["yoy"].notna()].copy() + lb = _is_lower_better(dir_map.get(ind_id, "positive")) n_total = len(grp_yoy) n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"])) @@ -1003,16 +998,14 @@ class IndicatorNormAggregator: "best_yoy_from" : best_yoy_from, "best_yoy_to" : best_yoy_to, }) - df_yoy_stats = pd.DataFrame(yoy_stats) - # ---- 12f. Country terbaik & terburuk --------------------------------- + # Country best/worst df_country_avg = ( df.groupby(["indicator_id", "country_id", "country_name"]) .agg(country_avg_value=("value", "mean")) .reset_index() ) - country_stats = [] for ind_id, grp in df_country_avg.groupby("indicator_id"): lb = _is_lower_better(dir_map.get(ind_id, "positive")) @@ -1029,13 +1022,11 @@ class IndicatorNormAggregator: }) df_country_stats = pd.DataFrame(country_stats) - # ---- 12g. Dimensi tetap per indikator -------------------------------- - df_dim = ( - df[["indicator_id"] + dim_cols] - .drop_duplicates(subset=["indicator_id"]) - ) + # Dim cols + dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] + df_dim = df[["indicator_id"] + dim_cols].drop_duplicates(subset=["indicator_id"]) - # ---- 12h. Merge semua ------------------------------------------------ + # Merge semua df_agg = ( df_dim .merge(df_first, on="indicator_id", how="left") @@ -1046,32 +1037,32 @@ class IndicatorNormAggregator: .merge(df_country_stats, on="indicator_id", how="left") ) - self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}") - self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}") - - # -- STEP 14: Assign performance --------------------------------------- - self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---") + # Performance df_agg["performance"] = pd.NA has_score = df_agg["avg_norm_score_1_100"].notna() df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" - n_good = (df_agg["performance"] == "Good").sum() - n_bad = (df_agg["performance"] == "Bad").sum() - self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}") - # -- STEP 15: Build narrative ------------------------------------------ - self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---") - df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1) + # ---- Build narrative (bilingual, interpretatif, plain text) ---- + self.logger.info("\n--- BUILD NARRATIVE (interpretatif, plain text, bilingual EN/ID) ---") + narratives_en = [] + narratives_id = [] + + for _, row in df_agg.iterrows(): + n_en, n_id = _build_narrative_per_indicator(row, df) + narratives_en.append(n_en) + narratives_id.append(n_id) + + df_agg["narrative_en"] = narratives_en + df_agg["narrative_id"] = narratives_id + self.logger.info(f" Narratives generated: {len(df_agg):,}") - self.logger.info("\n Sample (first 2):") - for _, row in df_agg.head(2).iterrows(): - self.logger.info( - f"\n [{int(row['indicator_id'])}] {row['indicator_name'][:60]}" - f"\n -> {row['narrative'][:300]}..." - ) + self.logger.info("\n Sample EN (first):") + self.logger.info(f" {df_agg.iloc[0]['narrative_en'][:300]}") + self.logger.info("\n Sample ID (first):") + self.logger.info(f" {df_agg.iloc[0]['narrative_id'][:300]}") - # -- STEP 16: Save ----------------------------------------------------- - self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---") + # ---- Save ---- out = df_agg[[ "indicator_id", "indicator_name", "unit", "direction", "pillar_name", "framework", @@ -1081,7 +1072,7 @@ class IndicatorNormAggregator: "n_yoy_total", "n_yoy_positive", "best_yoy_from", "best_yoy_to", "country_worst", "country_best", - "narrative", + "narrative_en", "narrative_id", ]].copy() out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True) @@ -1105,7 +1096,8 @@ class IndicatorNormAggregator: out["best_yoy_to"] = pd.to_numeric(out["best_yoy_to"], errors="coerce").astype("Int64") out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string") out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string") - out["narrative"] = out["narrative"].astype(str) + out["narrative_en"] = out["narrative_en"].astype(str) + out["narrative_id"] = out["narrative_id"].astype(str) schema = [ bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), @@ -1127,7 +1119,8 @@ class IndicatorNormAggregator: bigquery.SchemaField("best_yoy_to", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("country_worst", "STRING", mode="NULLABLE"), bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"), - bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), + bigquery.SchemaField("narrative_en", "STRING", mode="NULLABLE"), + bigquery.SchemaField("narrative_id", "STRING", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( @@ -1152,7 +1145,8 @@ class IndicatorNormAggregator: "config_snapshot" : json.dumps({ "source_table" : "agg_indicator_norm (in-memory df_final)", "granularity" : "indicator_id only (all years, all ASEAN countries)", - "aggregation" : "full-period summary per indicator", + "narrative_style" : "interpretive, plain text, no markdown, bilingual EN/ID", + "narrative_dimensions" : ["trend", "gap_trend", "anomaly", "country_consistency"], "performance_threshold": _PERFORMANCE_THRESHOLD, "layer" : "gold", }), @@ -1162,8 +1156,6 @@ class IndicatorNormAggregator: }), } save_etl_metadata(self.client, metadata) - self.logger.info(" Metadata -> [AUDIT] etl_metadata") - self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded # ========================================================================= @@ -1194,7 +1186,6 @@ class IndicatorNormAggregator: rows_loaded = self._save(df_final) self.pipeline_metadata["rows_loaded"] = rows_loaded self._log_summary(df_final) - self._build_narrative_table(df_final) self.pipeline_metadata["end_time"] = datetime.now() @@ -1217,10 +1208,6 @@ class IndicatorNormAggregator: # ============================================================================= def run_indicator_norm_aggregation(): - """ - Airflow task: Build agg_indicator_norm + agg_narrative_indicator. - Dipanggil setelah analytical_layer_to_gold selesai. - """ client = get_bigquery_client() agg = IndicatorNormAggregator(client) agg.run() @@ -1241,10 +1228,6 @@ if __name__ == "__main__": print("=" * 80) print("INDICATOR NORM AGGREGATION -> fs_asean_gold") - print(" Source : fact_asean_food_security_selected") - print(" Dim : dim_indicator (unit)") - print(" Output : agg_indicator_norm") - print(" agg_narrative_indicator") print("=" * 80) logger = setup_logging() diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index e2d38f9..9597549 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -6,18 +6,14 @@ UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'): - agg_pillar_by_country - agg_framework_by_country - agg_framework_asean (+ kolom performance_status: 'Good'/'Bad', threshold=60) - - agg_narrative_overview - - agg_narrative_pillar + - agg_narrative_overview (bilingual: narrative_en, narrative_id) + - agg_narrative_pillar (bilingual: narrative_en, narrative_id) -SOURCE TABLE: fact_asean_food_security_selected (sudah include nama + ID) - -n_indicators logic (sesuai agg_indicator_norm): - - Setiap tahun dihitung dari indikator yang benar-benar hadir di tahun tsb. - - Framework MDGs/SDGs per tahun mengikuti SDG_ONLY_KEYWORDS: - * Indikator tidak di SDG_ONLY -> selalu MDGs - * Indikator di SDG_ONLY + year >= sdgs_start_year -> SDGs - * Indikator di SDG_ONLY + year < sdgs_start_year -> MDGs - - Sehingga n_indicators MDGs dan SDGs bisa berbeda antar tahun. +Narrative style: + - Plain text, tanpa markdown bold (**) + - Interpretatif: membaca tren, gap, anomali, konsistensi dari data nyata + - Bilingual: narrative_en (Inggris) + narrative_id (Indonesia) + - Granularity: per tahun (Overview & Pillar) """ import pandas as pd @@ -50,11 +46,8 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({ }) NORMALIZE_FRAMEWORKS_JOINTLY = False +PERFORMANCE_THRESHOLD = 60.0 -# Threshold performance_status di agg_framework_asean -PERFORMANCE_THRESHOLD = 60.0 # score >= 60 -> "Good", < 60 -> "Bad" - -# SDG_ONLY_KEYWORDS (sama persis dengan bigquery_aggraget_fact_selected_layer.py) SDG_ONLY_KEYWORDS: frozenset = frozenset([ "prevalence of undernourishment (percent) (3-year average)", "number of people undernourished (million) (3-year average)", @@ -90,7 +83,7 @@ _FIES_DETECTION_LOWER: frozenset = frozenset([ # ============================================================================= -# Windows CP1252 safe logging +# WINDOWS CP1252 SAFE LOGGING # ============================================================================= class _SafeStreamHandler(logging.StreamHandler): @@ -178,16 +171,11 @@ def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger= def _performance_status(score) -> str: - """Classify score into 'Good' or 'Bad' based on PERFORMANCE_THRESHOLD.""" if score is None or (isinstance(score, float) and np.isnan(score)): return "N/A" return "Good" if score >= PERFORMANCE_THRESHOLD else "Bad" -# ============================================================================= -# NARRATIVE HELPERS -# ============================================================================= - def _fmt_score(score) -> str: if score is None or (isinstance(score, float) and np.isnan(score)): return "N/A" @@ -201,80 +189,235 @@ def _fmt_delta(delta) -> str: return f"{sign}{delta:.2f}" +# ============================================================================= +# NARRATIVE CONDITION DETECTORS (shared) +# ============================================================================= + +def _detect_series_trend(scores: list) -> str: + """ + Deteksi tren dari list skor berurutan. + Return: 'improving_consistent' | 'improving_slowing' | 'deteriorating' | 'fluctuating' + """ + if len(scores) < 3: + return "insufficient" + + x = np.arange(len(scores)) + slope = np.polyfit(x, scores, 1)[0] + cv = np.std(scores) / (np.mean(scores) + 1e-9) + + if cv > 0.20: + return "fluctuating" + + mid = len(scores) // 2 + slope1 = np.polyfit(np.arange(mid), scores[:mid], 1)[0] if mid > 1 else slope + slope2 = np.polyfit(np.arange(len(scores) - mid), scores[mid:], 1)[0] if (len(scores) - mid) > 1 else slope + + if slope > 0: + slowing = slope2 < slope1 + return "improving_slowing" if slowing else "improving_consistent" + else: + return "deteriorating" + + +def _detect_country_gap(scores_by_country_year: pd.DataFrame, score_col: str) -> str: + """ + Deteksi apakah std antar negara melebar atau menyempit dari waktu ke waktu. + scores_by_country_year: df dengan kolom [year, country_id, score_col] + """ + std_by_year = ( + scores_by_country_year.groupby("year")[score_col] + .std().dropna() + ) + if len(std_by_year) < 3: + return "unknown" + + years = sorted(std_by_year.index) + stds = [std_by_year[y] for y in years] + slope = np.polyfit(np.arange(len(stds)), stds, 1)[0] + mean_s = np.mean(stds) + + if abs(slope) < 0.02 * mean_s: + return "stable" + return "widening" if slope > 0 else "narrowing" + + +def _find_anomaly_year(values_by_year: dict) -> tuple: + """ + Cari tahun dengan perubahan YoY paling ekstrem. + values_by_year: {year: score} + Return: (year, 'drop' | 'rise') atau (None, None) + """ + years = sorted(values_by_year.keys()) + deltas = {} + for i in range(1, len(years)): + y0, y1 = years[i-1], years[i] + v0, v1 = values_by_year.get(y0), values_by_year.get(y1) + if v0 is not None and v1 is not None and not (pd.isna(v0) or pd.isna(v1)): + deltas[y1] = v1 - v0 + + if not deltas: + return None, None + + threshold = 1.5 * np.std(list(deltas.values())) + min_y = min(deltas, key=deltas.get) + max_y = max(deltas, key=deltas.get) + + if abs(deltas[min_y]) > threshold and deltas[min_y] < 0: + return min_y, "drop" + if abs(deltas[max_y]) > threshold and deltas[max_y] > 0: + return max_y, "rise" + return None, None + + +# ============================================================================= +# NARRATIVE BUILDER — OVERVIEW (per tahun) +# ============================================================================= + def _build_overview_narrative( year: int, - n_mdg: int, - n_sdg: int, - n_total_ind: int, score: float, performance_status: str, yoy_val, - yoy_pct, - prev_year: int, - prev_score, - prev_performance_status: str, + n_mdg: int, + n_sdg: int, ranking_list: list, most_improved_country, most_improved_delta, most_declined_country, most_declined_delta, -) -> str: + historical_scores: dict, # {year: score} semua tahun sebelumnya + country_scores_all: pd.DataFrame, # df [year, country_name, framework_score_1_100] +) -> tuple: """ - Narrative format (no em-dash): - In {year}, ASEAN scored {score} ({performance}) across {n_total} indicators - ({n_mdg} MDGs, {n_sdg} SDGs). Score {increased/decreased} by {delta} pts from - {prev_year} ({prev_score}). {top_country} led the region; {bottom_country} ranked - last. Biggest gain: {country}; biggest drop: {country}. + Narasi overview per tahun — interpretatif, plain text, bilingual. + Return: (narrative_en, narrative_id) """ + sentences_en = [] + sentences_id = [] - # Sentence 1: score + performance + indicators - ind_parts = [] - if n_mdg > 0: - ind_parts.append(f"**{n_mdg} MDGs**") - if n_sdg > 0: - ind_parts.append(f"**{n_sdg} SDGs**") - ind_detail = f" ({', '.join(ind_parts)})" if ind_parts else "" + # ---- 1. Status tahun ini vs threshold ---- + perf_word_en = "good" if performance_status == "Good" else "below target" + perf_word_id = "baik" if performance_status == "Good" else "di bawah target" - sent1 = ( - f"In **{year}**, ASEAN scored **{_fmt_score(score)}** (*{performance_status}*) " - f"across **{n_total_ind} indicators**{ind_detail}." + s1_en = ( + f"In {year}, ASEAN food security scored {_fmt_score(score)} out of 100 " + f"({perf_word_en}), covering {n_mdg + n_sdg} indicators " + f"({n_mdg} MDGs and {n_sdg} SDGs)." ) + s1_id = ( + f"Pada tahun {year}, skor ketahanan pangan ASEAN mencapai {_fmt_score(score)} dari 100 " + f"({perf_word_id}), mencakup {n_mdg + n_sdg} indikator " + f"({n_mdg} MDGs dan {n_sdg} SDGs)." + ) + sentences_en.append(s1_en) + sentences_id.append(s1_id) - # Sentence 2: YoY - if yoy_val is not None and prev_score is not None: - direction_word = "increased" if yoy_val >= 0 else "decreased" - sent2 = ( - f"Score {direction_word} by **{abs(yoy_val):.2f} pts** " - f"from {prev_year} ({_fmt_score(prev_score)}, *{prev_performance_status}*)." - ) - else: - sent2 = "No prior-year data available for comparison." - - # Sentence 3: country ranking - sent3 = "" - if ranking_list: - first = ranking_list[0] - last = ranking_list[-1] - if len(ranking_list) == 1: - sent3 = f"**{first['country_name']}** was the only country assessed ({_fmt_score(first['score'])})." + # ---- 2. Kondisi YoY tahun ini ---- + if yoy_val is not None and not pd.isna(yoy_val): + if abs(yoy_val) < 0.5: + s2_en = f"The score was relatively stable compared to the previous year." + s2_id = f"Skor relatif stabil dibandingkan tahun sebelumnya." + elif yoy_val > 0: + s2_en = f"This represents an improvement of {abs(yoy_val):.2f} points from the previous year." + s2_id = f"Ini merupakan peningkatan sebesar {abs(yoy_val):.2f} poin dari tahun sebelumnya." else: - sent3 = ( - f"**{first['country_name']}** led the region ({_fmt_score(first['score'])}); " - f"**{last['country_name']}** ranked last ({_fmt_score(last['score'])})." + s2_en = f"This represents a decline of {abs(yoy_val):.2f} points from the previous year." + s2_id = f"Ini merupakan penurunan sebesar {abs(yoy_val):.2f} poin dari tahun sebelumnya." + sentences_en.append(s2_en) + sentences_id.append(s2_id) + + # ---- 3. Tren historis (baca dari semua data yang ada) ---- + hist_years = sorted(historical_scores.keys()) + hist_scores = [historical_scores[y] for y in hist_years if not pd.isna(historical_scores.get(y, np.nan))] + + if len(hist_scores) >= 3: + trend = _detect_series_trend(hist_scores) + if trend == "improving_consistent": + s3_en = f"The overall trajectory since {hist_years[0]} has been consistently upward." + s3_id = f"Trajektori keseluruhan sejak {hist_years[0]} menunjukkan tren yang konsisten meningkat." + elif trend == "improving_slowing": + s3_en = f"While the long-term trend since {hist_years[0]} is positive, the pace of improvement has slowed in recent years." + s3_id = f"Meskipun tren jangka panjang sejak {hist_years[0]} positif, laju perbaikan melambat dalam beberapa tahun terakhir." + elif trend == "deteriorating": + s3_en = f"The overall trend since {hist_years[0]} shows a declining trajectory that warrants attention." + s3_id = f"Tren keseluruhan sejak {hist_years[0]} menunjukkan trajektori yang menurun dan perlu perhatian." + elif trend == "fluctuating": + s3_en = f"Progress since {hist_years[0]} has been uneven, with scores fluctuating across years." + s3_id = f"Kemajuan sejak {hist_years[0]} tidak merata, dengan skor yang berfluktuasi antar tahun." + else: + s3_en = "" + s3_id = "" + + if s3_en: + sentences_en.append(s3_en) + sentences_id.append(s3_id) + + # ---- 4. Gap antar negara ---- + if not country_scores_all.empty: + gap_trend = _detect_country_gap( + country_scores_all[country_scores_all["year"] <= year], + "framework_score_1_100" + ) + if gap_trend == "widening": + s4_en = "The performance gap among ASEAN member states has widened over time, indicating unequal progress." + s4_id = "Kesenjangan performa antar negara anggota ASEAN semakin melebar, mengindikasikan kemajuan yang tidak merata." + elif gap_trend == "narrowing": + s4_en = "The performance gap among ASEAN member states has narrowed, reflecting more balanced regional development." + s4_id = "Kesenjangan performa antar negara anggota ASEAN semakin menyempit, mencerminkan pembangunan regional yang lebih merata." + elif gap_trend == "stable": + s4_en = "The performance gap among ASEAN member states has remained relatively stable." + s4_id = "Kesenjangan performa antar negara anggota ASEAN relatif stabil." + else: + s4_en = "" + s4_id = "" + + if s4_en: + sentences_en.append(s4_en) + sentences_id.append(s4_id) + + # ---- 5. Top dan bottom country tahun ini ---- + if ranking_list and len(ranking_list) >= 2: + top = ranking_list[0] + bottom = ranking_list[-1] + s5_en = ( + f"In {year}, {top['country_name']} led the region with a score of " + f"{_fmt_score(top['score'])}, while {bottom['country_name']} ranked last " + f"at {_fmt_score(bottom['score'])}." + ) + s5_id = ( + f"Pada tahun {year}, {top['country_name']} memimpin kawasan dengan skor " + f"{_fmt_score(top['score'])}, sementara {bottom['country_name']} berada di " + f"posisi terbawah dengan skor {_fmt_score(bottom['score'])}." + ) + sentences_en.append(s5_en) + sentences_id.append(s5_id) + + # ---- 6. Most improved / declined country ---- + if most_improved_country and most_declined_country: + if most_improved_country != most_declined_country: + s6_en = ( + f"{most_improved_country} showed the biggest improvement " + f"({_fmt_delta(most_improved_delta)} pts), " + f"while {most_declined_country} experienced the largest decline " + f"({_fmt_delta(most_declined_delta)} pts)." ) + s6_id = ( + f"{most_improved_country} mencatat peningkatan terbesar " + f"({_fmt_delta(most_improved_delta)} poin), " + f"sementara {most_declined_country} mengalami penurunan terbesar " + f"({_fmt_delta(most_declined_delta)} poin)." + ) + sentences_en.append(s6_en) + sentences_id.append(s6_id) - # Sentence 4: most improved / declined - sent4_parts = [] - if most_improved_country and most_improved_delta is not None: - sent4_parts.append(f"Biggest gain: **{most_improved_country}** ({_fmt_delta(most_improved_delta)} pts)") - if most_declined_country and most_declined_delta is not None: - sent4_parts.append(f"biggest drop: **{most_declined_country}** ({_fmt_delta(most_declined_delta)} pts)") - sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else "" - if sent4: - sent4 = sent4[0].upper() + sent4[1:] + narrative_en = " ".join(s for s in sentences_en if s) + narrative_id = " ".join(s for s in sentences_id if s) + return narrative_en, narrative_id - return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) +# ============================================================================= +# NARRATIVE BUILDER — PILLAR (per tahun per pilar) +# ============================================================================= def _build_pillar_narrative( year: int, @@ -283,70 +426,137 @@ def _build_pillar_narrative( rank_in_year: int, n_pillars: int, yoy_val, - top_country, + top_country: str, top_country_score, - bot_country, + bot_country: str, bot_country_score, - strongest_pillar, - strongest_score, - weakest_pillar, - weakest_score, - most_improved_pillar, - most_improved_delta, - most_declined_pillar, - most_declined_delta, -) -> str: + pillar_scores_history: dict, # {year: score} untuk pilar ini + all_pillar_scores_year: pd.DataFrame, # df [pillar_name, pillar_score_1_100] tahun ini + country_pillar_all: pd.DataFrame, # df [year, country_id, pillar_country_score_1_100] pilar ini +) -> tuple: """ - Narrative format (no em-dash): - In {year}, {pillar} ranked {rank}/{n} with score {score}, {up/down} {delta} pts YoY. - Top country: {top_country}; bottom: {bot_country}. - Strongest pillar: {pillar}; weakest: {pillar}. + Narasi pillar per tahun — interpretatif, plain text, bilingual. + Return: (narrative_en, narrative_id) """ + sentences_en = [] + sentences_id = [] + # ---- 1. Posisi pilar tahun ini ---- rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") + perf_word_en = "good" if pillar_score >= PERFORMANCE_THRESHOLD else "below target" + perf_word_id = "baik" if pillar_score >= PERFORMANCE_THRESHOLD else "di bawah target" - # Sentence 1: rank + score + YoY - if yoy_val is not None: - direction_word = "up" if yoy_val >= 0 else "down" - yoy_clause = f", {direction_word} **{abs(yoy_val):.2f} pts** YoY" - else: - yoy_clause = ", no prior-year data" - - sent1 = ( - f"In **{year}**, **{pillar_name}** ranked **{rank_in_year}{rank_suffix}/{n_pillars}** " - f"with score **{_fmt_score(pillar_score)}**{yoy_clause}." + s1_en = ( + f"In {year}, the {pillar_name} pillar ranked {rank_in_year}{rank_suffix} out of " + f"{n_pillars} pillars with a score of {_fmt_score(pillar_score)} ({perf_word_en})." ) + s1_id = ( + f"Pada tahun {year}, pilar {pillar_name} menempati peringkat {rank_in_year} dari " + f"{n_pillars} pilar dengan skor {_fmt_score(pillar_score)} ({perf_word_id})." + ) + sentences_en.append(s1_en) + sentences_id.append(s1_id) - # Sentence 2: top / bottom country - sent2 = "" - if top_country and bot_country: - if top_country != bot_country: - sent2 = ( - f"Top country: **{top_country}** ({_fmt_score(top_country_score)}); " - f"bottom: **{bot_country}** ({_fmt_score(bot_country_score)})." - ) + # ---- 2. YoY pilar ini ---- + if yoy_val is not None and not pd.isna(yoy_val): + if abs(yoy_val) < 0.5: + s2_en = "Performance was relatively stable compared to the previous year." + s2_id = "Performa relatif stabil dibandingkan tahun sebelumnya." + elif yoy_val > 0: + s2_en = f"This is an improvement of {abs(yoy_val):.2f} points from the previous year." + s2_id = f"Ini merupakan peningkatan {abs(yoy_val):.2f} poin dari tahun sebelumnya." else: - sent2 = f"**{top_country}** was the only country with data ({_fmt_score(top_country_score)})." + s2_en = f"This marks a decline of {abs(yoy_val):.2f} points from the previous year." + s2_id = f"Ini menandai penurunan {abs(yoy_val):.2f} poin dari tahun sebelumnya." + sentences_en.append(s2_en) + sentences_id.append(s2_id) - # Sentence 3: strongest / weakest pillar - sent3 = "" - if strongest_pillar and weakest_pillar: - sent3 = ( - f"Strongest pillar: **{strongest_pillar}** ({_fmt_score(strongest_score)}); " - f"weakest: **{weakest_pillar}** ({_fmt_score(weakest_score)})." + # ---- 3. Tren historis pilar ini ---- + hist_years = sorted(pillar_scores_history.keys()) + hist_scores = [ + pillar_scores_history[y] + for y in hist_years + if not pd.isna(pillar_scores_history.get(y, np.nan)) + ] + + if len(hist_scores) >= 3: + trend = _detect_series_trend(hist_scores) + if trend == "improving_consistent": + s3_en = f"This pillar has shown consistent improvement since {hist_years[0]}." + s3_id = f"Pilar ini menunjukkan perbaikan yang konsisten sejak {hist_years[0]}." + elif trend == "improving_slowing": + s3_en = f"While the pillar improved since {hist_years[0]}, the pace has slowed in recent years." + s3_id = f"Meskipun pilar ini membaik sejak {hist_years[0]}, lajunya melambat dalam beberapa tahun terakhir." + elif trend == "deteriorating": + s3_en = f"This pillar has shown a declining trend since {hist_years[0]}, requiring targeted intervention." + s3_id = f"Pilar ini menunjukkan tren penurunan sejak {hist_years[0]}, memerlukan intervensi yang terarah." + elif trend == "fluctuating": + s3_en = f"Performance in this pillar has been inconsistent since {hist_years[0]}, with no clear trend." + s3_id = f"Performa pilar ini tidak konsisten sejak {hist_years[0]}, tanpa tren yang jelas." + else: + s3_en = "" + s3_id = "" + + if s3_en: + sentences_en.append(s3_en) + sentences_id.append(s3_id) + + # ---- 4. Gap antar negara dalam pilar ini ---- + if not country_pillar_all.empty: + gap_trend = _detect_country_gap( + country_pillar_all[country_pillar_all["year"] <= year], + "pillar_country_score_1_100" ) + if gap_trend == "widening": + s4_en = "Country disparities within this pillar have widened over time." + s4_id = "Kesenjangan antar negara dalam pilar ini semakin melebar seiring waktu." + elif gap_trend == "narrowing": + s4_en = "Country disparities within this pillar have narrowed, indicating more balanced progress." + s4_id = "Kesenjangan antar negara dalam pilar ini menyempit, mengindikasikan kemajuan yang lebih merata." + else: + s4_en = "" + s4_id = "" - # Sentence 4: most improved / declined pillar - sent4_parts = [] - if most_improved_pillar and most_improved_delta is not None: - sent4_parts.append(f"Best gain: **{most_improved_pillar}** ({_fmt_delta(most_improved_delta)} pts)") - if most_declined_pillar and most_declined_delta is not None: - sent4_parts.append(f"largest drop: **{most_declined_pillar}** ({_fmt_delta(most_declined_delta)} pts)") - sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else "" - if sent4: - sent4 = sent4[0].upper() + sent4[1:] + if s4_en: + sentences_en.append(s4_en) + sentences_id.append(s4_id) - return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) + # ---- 5. Top/bottom country dalam pilar ini ---- + if top_country and bot_country and top_country != bot_country: + s5_en = ( + f"{top_country} performed best in this pillar ({_fmt_score(top_country_score)}), " + f"while {bot_country} had the lowest score ({_fmt_score(bot_country_score)})." + ) + s5_id = ( + f"{top_country} memiliki performa terbaik dalam pilar ini ({_fmt_score(top_country_score)}), " + f"sementara {bot_country} memiliki skor terendah ({_fmt_score(bot_country_score)})." + ) + sentences_en.append(s5_en) + sentences_id.append(s5_id) + + # ---- 6. Posisi relatif pilar ini vs pilar lain ---- + if not all_pillar_scores_year.empty and len(all_pillar_scores_year) > 1: + sorted_pillars = all_pillar_scores_year.sort_values("pillar_score_1_100", ascending=False) + strongest = sorted_pillars.iloc[0] + weakest = sorted_pillars.iloc[-1] + + if strongest["pillar_name"] != pillar_name and weakest["pillar_name"] != pillar_name: + s6_en = ( + f"Across all pillars in {year}, {strongest['pillar_name']} scored highest " + f"({_fmt_score(strongest['pillar_score_1_100'])}) and {weakest['pillar_name']} " + f"scored lowest ({_fmt_score(weakest['pillar_score_1_100'])})." + ) + s6_id = ( + f"Di antara semua pilar pada tahun {year}, {strongest['pillar_name']} mendapat skor " + f"tertinggi ({_fmt_score(strongest['pillar_score_1_100'])}) dan {weakest['pillar_name']} " + f"mendapat skor terendah ({_fmt_score(weakest['pillar_score_1_100'])})." + ) + sentences_en.append(s6_en) + sentences_id.append(s6_id) + + narrative_en = " ".join(s for s in sentences_en if s) + narrative_id = " ".join(s for s in sentences_id if s) + return narrative_en, narrative_id # ============================================================================= @@ -371,8 +581,6 @@ class FoodSecurityAggregator: self.df = None self.sdgs_start_year = None - - # Lookup: (indicator_id, year) -> framework label self._ind_year_framework: pd.DataFrame = None # ========================================================================= @@ -398,31 +606,23 @@ class FoodSecurityAggregator: missing_cols = required_cols - set(self.df.columns) if missing_cols: raise ValueError( - f"Kolom berikut tidak ditemukan di fact_asean_food_security_selected: {missing_cols}" + f"Kolom berikut tidak ditemukan: {missing_cols}" ) n_null_dir = self.df["direction"].isna().sum() if n_null_dir > 0: - self.logger.warning( - f" [DIRECTION] {n_null_dir} rows dengan direction NULL -> diisi 'positive'" - ) + self.logger.warning(f" [DIRECTION] {n_null_dir} rows NULL -> diisi 'positive'") self.df["direction"] = self.df["direction"].fillna("positive") - dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() - self.logger.info(f"\n Distribusi direction per indikator:") - for d, cnt in dir_dist.items(): - tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal" - self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]") - - self.logger.info(f"\n Rows loaded : {len(self.df):,}") - self.logger.info(f" Negara : {self.df['country_id'].nunique()}") - self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}") + self.logger.info(f" Rows : {len(self.df):,}") + self.logger.info(f" Countries : {self.df['country_id'].nunique()}") + self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}") self.logger.info( - f" Tahun : {int(self.df['year'].min())} - {int(self.df['year'].max())}" + f" Years : {int(self.df['year'].min())} - {int(self.df['year'].max())}" ) # ========================================================================= - # STEP 1b: Detect sdgs_start_year + assign framework per (indicator, year) + # STEP 1b: Detect sdgs_start_year + assign framework # ========================================================================= def _detect_sdgs_start_year(self) -> int: @@ -440,7 +640,6 @@ class FoodSecurityAggregator: ) unique_years = sorted(ind_min_year["min_year"].unique()) if len(unique_years) == 1: - self.logger.info(" [Fallback] Hanya 1 cluster -> semua MDGs") return int(unique_years[0]) + 9999 gaps = [ @@ -449,12 +648,12 @@ class FoodSecurityAggregator: ] gaps.sort(reverse=True) _, y_before, y_after = gaps[0] - self.logger.info(f" [Fallback gap] sdgs_start_year = {y_after} (gap {y_before}->{y_after})") + self.logger.info(f" [Fallback gap] sdgs_start_year = {y_after}") return int(y_after) def _assign_framework_labels(self): self.logger.info("\n" + "=" * 70) - self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS (per indicator per year)") + self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS") self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}") self.logger.info("=" * 70) @@ -477,21 +676,6 @@ class FoodSecurityAggregator: for fw, cnt in fw_dist.items(): self.logger.info(f" {fw:<6}: {cnt:,} rows") - ind_fw_yr = ( - self._ind_year_framework - .groupby(["year", "framework"])["indicator_id"] - .nunique() - .reset_index() - .rename(columns={"indicator_id": "n_indicators"}) - .sort_values(["year", "framework"]) - ) - self.logger.info(f"\n {'Year':<6} {'Framework':<8} {'n_indicators'}") - self.logger.info(" " + "-" * 30) - for _, r in ind_fw_yr.iterrows(): - self.logger.info( - f" {int(r['year']):<6} {r['framework']:<8} {int(r['n_indicators'])}" - ) - def _count_framework_indicators(self, year: int, framework: str) -> int: mask = ( (self._ind_year_framework["year"] == year) & @@ -505,9 +689,7 @@ class FoodSecurityAggregator: def _get_norm_value_df(self) -> pd.DataFrame: if "framework" not in self.df.columns: - raise ValueError( - "Kolom 'framework' tidak ada. Pastikan _assign_framework_labels() dipanggil lebih dulu." - ) + raise ValueError("Kolom 'framework' tidak ada.") norm_parts = [] for ind_id, grp in self.df.groupby("indicator_id"): @@ -737,7 +919,6 @@ class FoodSecurityAggregator: df_normed = self._get_norm_value_df() parts = [] - # TOTAL agg_total = ( country_composite[[ "country_id", "country_name", "year", @@ -752,7 +933,6 @@ class FoodSecurityAggregator: agg_total["framework"] = "Total" parts.append(agg_total) - # MDGs pre-SDGs pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() if not pre_sdgs_rows.empty: mdgs_pre = ( @@ -769,7 +949,6 @@ class FoodSecurityAggregator: mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # MDGs mixed (year >= sdgs_start_year, hanya indikator MDGs) mdgs_indicator_ids = set( self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"] ) @@ -793,7 +972,6 @@ class FoodSecurityAggregator: agg_mdgs_mixed["framework"] = "MDGs" parts.append(agg_mdgs_mixed) - # SDGs sdgs_indicator_ids = set( self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"] ) @@ -864,7 +1042,7 @@ class FoodSecurityAggregator: raise # ========================================================================= - # STEP 5: agg_framework_asean (+ performance_status) + # STEP 5: agg_framework_asean # ========================================================================= def calc_framework_asean(self) -> pd.DataFrame: @@ -872,7 +1050,6 @@ class FoodSecurityAggregator: self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) self.logger.info(f"STEP 5: {table_name} -> [Gold] fs_asean_gold") - self.logger.info(f" performance_status threshold: {PERFORMANCE_THRESHOLD}") self.logger.info("=" * 70) try: @@ -907,7 +1084,6 @@ class FoodSecurityAggregator: def _n_ind(year_val, framework_val): return self._count_framework_indicators(year_val, framework_val) - # TOTAL total_cols = asean_overall[[ "year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries" ]].copy().rename(columns={ @@ -923,7 +1099,6 @@ class FoodSecurityAggregator: total_cols["framework"] = "Total" parts.append(total_cols) - # MDGs pre-SDGs pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() if not pre_sdgs.empty: mdgs_pre = pre_sdgs[[ @@ -937,7 +1112,6 @@ class FoodSecurityAggregator: mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # MDGs mixed mdgs_indicator_ids = set( self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"] ) @@ -963,7 +1137,6 @@ class FoodSecurityAggregator: asean_mdgs["framework"] = "MDGs" parts.append(asean_mdgs) - # SDGs sdgs_indicator_ids = set( self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"] ) @@ -1005,21 +1178,9 @@ class FoodSecurityAggregator: df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) for col in ["framework_norm", "std_norm", "framework_score_1_100"]: df[col] = df[col].astype(float) - df["performance_status"] = df["performance_status"].astype(str) self._validate_mdgs_equals_total(df, level="asean") - self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):") - for fw in df["framework"].unique(): - sub = df[df["framework"] == fw].sort_values("year") - for _, r in sub.iterrows(): - self.logger.info( - f" {fw:<8} {int(r['year'])}: " - f"score={r['framework_score_1_100']:.2f} " - f"n_ind={int(r['n_indicators'])} " - f"-> {r['performance_status']}" - ) - schema = [ bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), @@ -1055,6 +1216,7 @@ class FoodSecurityAggregator: self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold") + self.logger.info(" Narrative: interpretatif, plain text, bilingual EN/ID") self.logger.info("=" * 70) try: @@ -1122,23 +1284,23 @@ class FoodSecurityAggregator: most_improved_country = most_declined_country = None most_improved_delta = most_declined_delta = None - narrative = _build_overview_narrative( - year = yr, - n_mdg = n_mdg, - n_sdg = n_sdg, - n_total_ind = n_total_ind, - score = score, - performance_status = perf_status, - yoy_val = yoy_val, - yoy_pct = yoy_pct, - prev_year = yr - 1, - prev_score = prev_score, - prev_performance_status = prev_status, - ranking_list = ranking_list, - most_improved_country = most_improved_country, - most_improved_delta = most_improved_delta, - most_declined_country = most_declined_country, - most_declined_delta = most_declined_delta, + # Semua data skor negara untuk gap analysis + country_scores_all = country_total[["year", "country_id", "framework_score_1_100"]].copy() + + narrative_en, narrative_id = _build_overview_narrative( + year = yr, + score = score, + performance_status = perf_status, + yoy_val = yoy_val, + n_mdg = n_mdg, + n_sdg = n_sdg, + ranking_list = ranking_list, + most_improved_country = most_improved_country, + most_improved_delta = most_improved_delta, + most_declined_country = most_declined_country, + most_declined_delta = most_declined_delta, + historical_scores = score_by_year, + country_scores_all = country_scores_all, ) records.append({ @@ -1155,7 +1317,8 @@ class FoodSecurityAggregator: "most_improved_delta": most_improved_delta, "most_declined_country": most_declined_country, "most_declined_delta": most_declined_delta, - "narrative_overview": narrative, + "narrative_en": narrative_en, + "narrative_id": narrative_id, }) df = pd.DataFrame(records) @@ -1165,9 +1328,16 @@ class FoodSecurityAggregator: df["n_total_indicators"] = df["n_total_indicators"].astype(int) df["asean_total_score"] = df["asean_total_score"].astype(float) df["performance_status"] = df["performance_status"].astype(str) + df["narrative_en"] = df["narrative_en"].astype(str) + df["narrative_id"] = df["narrative_id"].astype(str) for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]: df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + self.logger.info("\n Sample narrative_en (year 1):") + self.logger.info(f" {df.iloc[0]['narrative_en'][:300]}") + self.logger.info("\n Sample narrative_id (year 1):") + self.logger.info(f" {df.iloc[0]['narrative_id'][:300]}") + schema = [ bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"), @@ -1182,7 +1352,8 @@ class FoodSecurityAggregator: bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"), bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), + bigquery.SchemaField("narrative_en", "STRING", mode="REQUIRED"), + bigquery.SchemaField("narrative_id", "STRING", mode="REQUIRED"), ] rows = load_to_bigquery( self.client, df, table_name, layer='gold', @@ -1208,12 +1379,20 @@ class FoodSecurityAggregator: self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold") + self.logger.info(" Narrative: interpretatif, plain text, bilingual EN/ID") self.logger.info("=" * 70) try: records = [] years = sorted(df_pillar_composite["year"].unique()) + # Precompute history per pillar + pillar_history = {} + for p_id, grp in df_pillar_composite.groupby("pillar_id"): + pillar_history[p_id] = dict( + zip(grp["year"].astype(int), grp["pillar_score_1_100"].astype(float)) + ) + for yr in years: yr_pillars = ( df_pillar_composite[df_pillar_composite["year"] == yr] @@ -1222,21 +1401,6 @@ class FoodSecurityAggregator: ) yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] - strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None - weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None - - yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) - if not yr_pillars_yoy.empty: - best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() - worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() - most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) - most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) - most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) - most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2) - else: - most_improved_pillar = most_declined_pillar = None - most_improved_delta = most_declined_delta = None - for _, prow in yr_pillars.iterrows(): p_id = int(prow["pillar_id"]) p_name = str(prow["pillar_name"]) @@ -1259,25 +1423,30 @@ class FoodSecurityAggregator: top_country = bot_country = None top_country_score = bot_country_score = None - narrative = _build_pillar_narrative( - year = yr, - pillar_name = p_name, - pillar_score = p_score, - rank_in_year = p_rank, - n_pillars = len(yr_pillars), - yoy_val = p_yoy_val, - top_country = top_country, - top_country_score = top_country_score, - bot_country = bot_country, - bot_country_score = bot_country_score, - strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, - strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, - weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, - weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, - most_improved_pillar = most_improved_pillar, - most_improved_delta = most_improved_delta, - most_declined_pillar = most_declined_pillar, - most_declined_delta = most_declined_delta, + # Data historis hanya sampai tahun ini + hist_up_to_yr = { + y: s for y, s in pillar_history.get(p_id, {}).items() if y <= yr + } + + # Data negara-pilar ini semua tahun (untuk gap analysis) + country_pillar_all = df_pillar_by_country[ + df_pillar_by_country["pillar_id"] == p_id + ][["year", "country_id", "pillar_country_score_1_100"]].copy() + + narrative_en, narrative_id = _build_pillar_narrative( + year = yr, + pillar_name = p_name, + pillar_score = p_score, + rank_in_year = p_rank, + n_pillars = len(yr_pillars), + yoy_val = p_yoy_val, + top_country = top_country, + top_country_score = top_country_score, + bot_country = bot_country, + bot_country_score = bot_country_score, + pillar_scores_history = hist_up_to_yr, + all_pillar_scores_year= yr_pillars[["pillar_name", "pillar_score_1_100"]].copy(), + country_pillar_all = country_pillar_all, ) records.append({ @@ -1291,16 +1460,24 @@ class FoodSecurityAggregator: "top_country_score": top_country_score, "bottom_country": bot_country, "bottom_country_score": bot_country_score, - "narrative_pillar": narrative, + "narrative_en": narrative_en, + "narrative_id": narrative_id, }) df = pd.DataFrame(records) df["year"] = df["year"].astype(int) df["pillar_id"] = df["pillar_id"].astype(int) df["rank_in_year"] = df["rank_in_year"].astype(int) + df["narrative_en"] = df["narrative_en"].astype(str) + df["narrative_id"] = df["narrative_id"].astype(str) for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + self.logger.info("\n Sample narrative_en (first row):") + self.logger.info(f" {df.iloc[0]['narrative_en'][:300]}") + self.logger.info("\n Sample narrative_id (first row):") + self.logger.info(f" {df.iloc[0]['narrative_id'][:300]}") + schema = [ bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), @@ -1312,7 +1489,8 @@ class FoodSecurityAggregator: bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"), bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), + bigquery.SchemaField("narrative_en", "STRING", mode="REQUIRED"), + bigquery.SchemaField("narrative_id", "STRING", mode="REQUIRED"), ] rows = load_to_bigquery( self.client, df, table_name, layer='gold', @@ -1338,7 +1516,7 @@ class FoodSecurityAggregator: self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)") return if mdgs_pre.empty or total_pre.empty: - self.logger.warning(f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}") + self.logger.warning(f" -> [WARNING] Salah satu kosong") return check = mdgs_pre.merge(total_pre, on=group_by) max_diff = (check["mdgs_score"] - check["total_score"]).abs().max() @@ -1348,15 +1526,12 @@ class FoodSecurityAggregator: def _finalize(self, table_name: str, rows_loaded: int): end_time = datetime.now() start_time = self.load_metadata[table_name].get("start_time") - self.load_metadata[table_name].update({ "rows_loaded": rows_loaded, "status" : "success", "end_time" : end_time, }) - log_update(self.client, "DW", table_name, "full_load", rows_loaded) - try: save_etl_metadata( self.client, @@ -1369,24 +1544,15 @@ class FoodSecurityAggregator: ) ) except Exception as meta_err: - self.logger.warning( - f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}" - ) - + self.logger.warning(f" [METADATA WARNING] {table_name}: {meta_err}") self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") def _fail(self, table_name: str, error: Exception): end_time = datetime.now() start_time = self.load_metadata[table_name].get("start_time") error_msg = str(error) - - self.load_metadata[table_name].update({ - "status" : "failed", - "end_time": end_time, - }) - + self.load_metadata[table_name].update({"status": "failed", "end_time": end_time}) log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg) - try: save_etl_metadata( self.client, @@ -1400,10 +1566,7 @@ class FoodSecurityAggregator: ) ) except Exception as meta_err: - self.logger.warning( - f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}" - ) - + self.logger.warning(f" [METADATA WARNING] {table_name}: {meta_err}") self.logger.error(f" [FAIL] {table_name}: {error_msg}") # ========================================================================= @@ -1414,11 +1577,8 @@ class FoodSecurityAggregator: start = datetime.now() self.logger.info("\n" + "=" * 70) self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold") - self.logger.info(" Source : fact_asean_food_security_selected") - self.logger.info(" Outputs : agg_pillar_composite | agg_pillar_by_country") - self.logger.info(" agg_framework_by_country | agg_framework_asean") - self.logger.info(" agg_narrative_overview | agg_narrative_pillar") - self.logger.info(f" Performance threshold: {PERFORMANCE_THRESHOLD} (Good/Bad)") + self.logger.info(f" Performance threshold: {PERFORMANCE_THRESHOLD}") + self.logger.info(f" Narrative style : interpretive, plain text, bilingual EN/ID") self.logger.info("=" * 70) self.load_data() @@ -1479,7 +1639,6 @@ if __name__ == "__main__": print("=" * 70) print("FOOD SECURITY AGGREGATION -> fs_asean_gold") - print(f" Source : fact_asean_food_security_selected") print(f" NORMALIZE_FRAMEWORKS_JOINTLY : {NORMALIZE_FRAMEWORKS_JOINTLY}") print(f" PERFORMANCE_THRESHOLD : {PERFORMANCE_THRESHOLD}") print("=" * 70)