diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index 94ffb1a..ecd8422 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -244,94 +244,163 @@ def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple: return direction_word, change_desc, is_positive +# ============================================================================= +# PURE HELPER — narrative builder (per indicator, all years, all countries) +# ====================================================================== -def _build_narrative(row: pd.Series) -> str: +def _build_narrative_per_indicator(row: pd.Series) -> str: """ - Bangun 1 paragraf narasi ASEAN-level untuk satu baris (year x indicator_id). + Bangun 1 paragraf narasi ASEAN-level untuk satu indikator, + merangkum seluruh periode (year_min - year_max) dan seluruh negara. + + Kolom yang dibutuhkan dari row: + indicator_name, unit, direction, pillar_name, framework, + year_min, year_max, n_countries, + avg_value_first, avg_value_last, + avg_norm_score_1_100, -- rata-rata seluruh periode + performance, -- Good | Bad | null + n_yoy_total, -- total transisi year-on-year + n_yoy_positive, -- jumlah transisi yang membaik + best_yoy_from, best_yoy_to, -- periode dengan perbaikan terbesar + country_worst, country_best -- negara dengan nilai terburuk / terbaik """ - year = int(row["year"]) ind_name = str(row["indicator_name"]).strip() unit = str(row["unit"]).strip() if row["unit"] else "" direction = str(row["direction"]).strip() pillar = str(row["pillar_name"]).strip() framework = str(row["framework"]).strip() - avg_val = row["avg_value"] + year_min = int(row["year_min"]) + year_max = int(row["year_max"]) + n_countries = int(row["n_countries"]) avg_score = row["avg_norm_score_1_100"] performance = row["performance"] - yoy = row["yoy_avg_value"] - n_countries = int(row["n_countries"]) if not pd.isna(row["n_countries"]) else 0 - + + avg_first = row["avg_value_first"] + avg_last = row["avg_value_last"] + + n_yoy_total = int(row["n_yoy_total"]) if not pd.isna(row["n_yoy_total"]) else 0 + n_yoy_positive = int(row["n_yoy_positive"]) if not pd.isna(row["n_yoy_positive"]) else 0 + + best_yoy_from = row["best_yoy_from"] + best_yoy_to = row["best_yoy_to"] + + country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None + country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None + lower_better = _is_lower_better(direction) direction_label = ( - "lower values indicate better outcomes" if lower_better + "lower values indicate better outcomes" + if lower_better else "higher values indicate better outcomes" ) - - # --- Bagian 1: Nilai rata-rata ASEAN --- - val_str = _format_value(avg_val, unit) + + # ---- Kalimat 1: Identifikasi indikator + cakupan ------------------------- + member_str = f"{n_countries} member state{'s' if n_countries > 1 else ''}" sentence1 = ( - f"In {year}, the ASEAN regional average for {ind_name} stood at {val_str}" + f"Across ASEAN, {ind_name} under the {framework} framework " + f"({pillar} pillar) was monitored from {year_min} to {year_max} " + f"across {member_str}." ) - if n_countries > 0: - sentence1 += ( - f", based on data from {n_countries} " - f"ASEAN member state{'s' if n_countries > 1 else ''}" + + # ---- Kalimat 2: Tren keseluruhan (first → last) -------------------------- + if not pd.isna(avg_first) and not pd.isna(avg_last): + diff = avg_last - avg_first + abs_diff = abs(diff) + + # Format nilai + def fmt(v): + if abs(v) >= 1000: + return f"{v:,.1f}" + elif abs(v) >= 10: + return f"{v:.2f}" + else: + return f"{v:.3f}" + + first_str = f"{fmt(avg_first)}{' ' + unit if unit else ''}" + last_str = f"{fmt(avg_last)}{' ' + unit if unit else ''}" + diff_str = f"{fmt(abs_diff)}{' ' + unit if unit else ''}" + + # Apakah tren menguntungkan? + is_improving = (diff < 0) if lower_better else (diff > 0) + trend_word = "improving" if is_improving else "deteriorating" + verb = "declining" if diff < 0 else "rising" + + sentence2 = ( + f"Since {direction_label}, the region collectively showed " + f"{'an' if trend_word[0] in 'aeiou' else 'a'} {trend_word} trend, " + f"with the ASEAN average {verb} from {first_str} in {year_min} " + f"to {last_str} in {year_max} " + f"(a cumulative {'reduction' if diff < 0 else 'increase'} of {diff_str})." ) - sentence1 += "." - - # --- Bagian 2: Score dan performance --- + else: + sentence2 = ( + f"Since {direction_label}, trend analysis could not be performed " + f"due to missing data at the start or end of the period." + ) + + # ---- Kalimat 3: Score + performance ------------------------------------- if not pd.isna(avg_score): score_str = f"{avg_score:.1f} out of 100" if performance == "Good": - perf_phrase = ( - f"The region achieved a normalized score of {score_str}, " - f"classified as Good performance meeting the 60-point threshold " - f"under the {framework} framework ({pillar} pillar)." + sentence3 = ( + f"The regional normalized score averaged {score_str} " + f"classified as Good performance." ) elif performance == "Bad": - perf_phrase = ( - f"The region recorded a normalized score of {score_str}, " - f"classified as Bad performance falling below the 60-point threshold " - f"under the {framework} framework ({pillar} pillar)." + sentence3 = ( + f"The regional normalized score averaged {score_str} " + f"classified as Bad performance, falling below the 60-point threshold." ) else: - perf_phrase = ( - f"The region recorded a normalized score of {score_str} " - f"under the {framework} framework ({pillar} pillar)." + sentence3 = ( + f"The regional normalized score averaged {score_str}." ) else: - perf_phrase = ( - f"Performance could not be assessed due to insufficient data " - f"under the {framework} framework ({pillar} pillar)." - ) - - # --- Bagian 3 & 4: Arah + YoY --- - direction_phrase = f"Since {direction_label} for this indicator" - - if not pd.isna(yoy) and yoy != 0: - direction_word, change_desc, is_positive = _format_yoy(yoy, unit, lower_better) - if is_positive: - trend_word = "a positive trend" - tone = "reflecting improvements in regional food security performance" + sentence3 = "The regional normalized performance score could not be assessed." + + # ---- Kalimat 4: Negara terbaik & terburuk -------------------------------- + if country_worst and country_best and country_worst != country_best: + if lower_better: + worst_label = "highest (most concerning)" + best_label = "consistently performed best (lowest values)" else: - trend_word = "a deteriorating trend" - tone = "signaling the need for greater regional attention and policy response" - yoy_phrase = ( - f"{direction_phrase}, the regional average {direction_word} {change_desc} " - f"compared to {year - 1}, reflecting {trend_word} — {tone}." + worst_label = "lowest (most concerning)" + best_label = "consistently performed best (highest values)" + + sentence4 = ( + f"Among member states, {country_worst} recorded the {worst_label} " + f"levels throughout the period, while {country_best} {best_label}." ) - elif pd.isna(yoy): - yoy_phrase = ( - f"No prior year data is available for comparison, " - f"as this is the earliest recorded year for this indicator in the dataset." + elif country_best: + sentence4 = ( + f"Among member states, {country_best} consistently recorded the " + f"best performance throughout the period." ) else: - yoy_phrase = ( - f"{direction_phrase}, the regional average remained stable " - f"compared to {year - 1}, with no measurable change year-on-year." + sentence4 = "" + + # ---- Kalimat 5: YoY transitions ----------------------------------------- + if n_yoy_total > 0: + yoy_sentence = ( + f"Year-on-year, the region improved in {n_yoy_positive} out of " + f"{n_yoy_total} transition{'s' if n_yoy_total > 1 else ''}" ) - - return f"{sentence1} {perf_phrase} {yoy_phrase}" + if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to): + yoy_sentence += ( + f", with the largest regional gain occurring between " + f"{int(best_yoy_from)} and {int(best_yoy_to)}." + ) + else: + yoy_sentence += "." + else: + yoy_sentence = "Insufficient data to assess year-on-year transitions." + + parts = [sentence1, sentence2, sentence3] + if sentence4: + parts.append(sentence4) + parts.append(yoy_sentence) + + return " ".join(parts) # ============================================================================= @@ -944,56 +1013,172 @@ class IndicatorNormAggregator: def _build_narrative_table(self, df_final: pd.DataFrame): """ - Pipeline agg_narrative_indicator yang dijalankan otomatis - setelah agg_indicator_norm selesai. Memakai df_final yang sudah ada - di memori, tanpa re-load dari BigQuery. + Pipeline agg_narrative_indicator — granularity: per indicator_id (1 baris per indikator). + Narasi merangkum seluruh periode + seluruh negara ASEAN. + Dijalankan otomatis setelah agg_indicator_norm selesai. """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 12-16: agg_narrative_indicator") - self.logger.info(" Level : ASEAN (year x indicator_id)") + self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)") self.logger.info("=" * 80) - - # -- STEP 12: Agregasi ke level ASEAN -- - self.logger.info("\n--- STEP 12: AGGREGATE TO ASEAN LEVEL ---") + + # -- STEP 12: Hitung statistik agregat per (indicator_id, country_id, year) -- + self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---") + + df = df_final.copy() + + # Dimensi tetap per indikator dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] - agg_dict = {col: "first" for col in dim_cols} - agg_dict["value"] = "mean" - agg_dict["norm_score_1_100"] = "mean" - agg_dict["country_id"] = "count" - - df_agg = ( - df_final.groupby(["year", "indicator_id"]) - .agg(agg_dict) + + # ---- 12a. ASEAN avg per (indicator_id, year) -> untuk first/last & YoY --- + df_yr = ( + df.groupby(["indicator_id", "year"]) + .agg( + avg_value =("value", "mean"), + avg_norm_score =("norm_score_1_100", "mean"), + n_countries_year =("country_id", "nunique"), + ) .reset_index() - .rename(columns={ - "value" : "avg_value", - "norm_score_1_100": "avg_norm_score_1_100", - "country_id" : "n_countries", - }) ) - self.logger.info(f" Rows : {len(df_agg):,}") - self.logger.info(f" Inds : {df_agg['indicator_id'].nunique()}") - self.logger.info( - f" Years : {int(df_agg['year'].min())} - {int(df_agg['year'].max())}" + + # ---- 12b. first year / last year avg value per indikator ----------------- + df_first = ( + df_yr.sort_values("year") + .groupby("indicator_id") + .first() + .reset_index()[["indicator_id", "year", "avg_value"]] + .rename(columns={"year": "year_min", "avg_value": "avg_value_first"}) ) - - # -- STEP 13: YoY avg_value per indikator -- - self.logger.info("\n--- STEP 13: COMPUTE YoY avg_value ---") - parts = [] - for ind_id, grp in df_agg.groupby("indicator_id"): + df_last = ( + df_yr.sort_values("year") + .groupby("indicator_id") + .last() + .reset_index()[["indicator_id", "year", "avg_value"]] + .rename(columns={"year": "year_max", "avg_value": "avg_value_last"}) + ) + + # ---- 12c. Rata-rata norm_score seluruh periode ---------------------------- + df_score_avg = ( + df_yr.groupby("indicator_id") + .agg(avg_norm_score_1_100=("avg_norm_score", "mean")) + .reset_index() + ) + + # ---- 12d. n_countries: maks negara yang pernah hadir --------------------- + df_nc = ( + df.groupby("indicator_id")["country_id"] + .nunique() + .reset_index() + .rename(columns={"country_id": "n_countries"}) + ) + + # ---- 12e. YoY per (indicator_id) di level ASEAN avg ---------------------- + self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---") + + yoy_parts = [] + for ind_id, grp in df_yr.groupby("indicator_id"): grp = grp.sort_values("year").copy() - grp["prev_avg_value"] = grp["avg_value"].shift(1) - grp["yoy_avg_value"] = np.where( - grp["avg_value"].notna() & grp["prev_avg_value"].notna(), - grp["avg_value"] - grp["prev_avg_value"], + grp["prev_avg"] = grp["avg_value"].shift(1) + grp["yoy"] = np.where( + grp["avg_value"].notna() & grp["prev_avg"].notna(), + grp["avg_value"] - grp["prev_avg"], np.nan, ) - grp = grp.drop(columns=["prev_avg_value"]) - parts.append(grp) - df_agg = pd.concat(parts, ignore_index=True) - self.logger.info(f" yoy_avg_value nulls: {df_agg['yoy_avg_value'].isna().sum():,}") - - # -- STEP 14: Assign performance -- + grp = grp.drop(columns=["prev_avg"]) + yoy_parts.append(grp) + df_yr = pd.concat(yoy_parts, ignore_index=True) + + # Ambil direction per indikator untuk tentukan "improving" + dir_map = ( + df[["indicator_id", "direction"]] + .drop_duplicates(subset=["indicator_id"]) + .set_index("indicator_id")["direction"] + .to_dict() + ) + + def _is_positive_yoy(ind_id, yoy_val): + """True jika perubahan yoy menguntungkan sesuai direction.""" + if pd.isna(yoy_val): + return False + lb = _is_lower_better(dir_map.get(ind_id, "positive")) + return (yoy_val < 0) if lb else (yoy_val > 0) + + # Hitung n_yoy_total, n_yoy_positive, best_yoy + yoy_stats = [] + for ind_id, grp in df_yr.groupby("indicator_id"): + grp_yoy = grp[grp["yoy"].notna()].copy() + lb = _is_lower_better(dir_map.get(ind_id, "positive")) + + n_total = len(grp_yoy) + n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"])) + + # "Best" = perubahan paling menguntungkan + if n_total > 0: + if lb: + idx_best = grp_yoy["yoy"].idxmin() # paling negatif = paling baik + else: + idx_best = grp_yoy["yoy"].idxmax() # paling positif = paling baik + best_row = grp_yoy.loc[idx_best] + best_yoy_from = best_row["year"] - 1 + best_yoy_to = best_row["year"] + else: + best_yoy_from = np.nan + best_yoy_to = np.nan + + yoy_stats.append({ + "indicator_id" : ind_id, + "n_yoy_total" : n_total, + "n_yoy_positive": n_positive, + "best_yoy_from" : best_yoy_from, + "best_yoy_to" : best_yoy_to, + }) + + df_yoy_stats = pd.DataFrame(yoy_stats) + + # ---- 12f. Country terbaik & terburuk (rata-rata value seluruh periode) --- + df_country_avg = ( + df.groupby(["indicator_id", "country_id", "country_name"]) + .agg(country_avg_value=("value", "mean")) + .reset_index() + ) + + country_stats = [] + for ind_id, grp in df_country_avg.groupby("indicator_id"): + lb = _is_lower_better(dir_map.get(ind_id, "positive")) + if lb: + worst_row = grp.loc[grp["country_avg_value"].idxmax()] + best_row = grp.loc[grp["country_avg_value"].idxmin()] + else: + worst_row = grp.loc[grp["country_avg_value"].idxmin()] + best_row = grp.loc[grp["country_avg_value"].idxmax()] + country_stats.append({ + "indicator_id": ind_id, + "country_worst": worst_row["country_name"], + "country_best" : best_row["country_name"], + }) + df_country_stats = pd.DataFrame(country_stats) + + # ---- 12g. Dimensi tetap per indikator ------------------------------------ + df_dim = ( + df[["indicator_id"] + dim_cols] + .drop_duplicates(subset=["indicator_id"]) + ) + + # ---- 12h. Merge semua ------------------------------------------------------- + df_agg = ( + df_dim + .merge(df_first, on="indicator_id", how="left") + .merge(df_last, on="indicator_id", how="left") + .merge(df_score_avg, on="indicator_id", how="left") + .merge(df_nc, on="indicator_id", how="left") + .merge(df_yoy_stats, on="indicator_id", how="left") + .merge(df_country_stats,on="indicator_id", how="left") + ) + + self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}") + self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}") + + # -- STEP 14: Assign performance -------------------------------------------- self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---") df_agg["performance"] = pd.NA has_score = df_agg["avg_norm_score_1_100"].notna() @@ -1002,69 +1187,89 @@ class IndicatorNormAggregator: n_good = (df_agg["performance"] == "Good").sum() n_bad = (df_agg["performance"] == "Bad").sum() self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}") - - # -- STEP 15: Build narrative -- - self.logger.info("\n--- STEP 15: BUILD NARRATIVE ---") - df_agg["narrative"] = df_agg.apply(_build_narrative, axis=1) + + # -- STEP 15: Build narrative ----------------------------------------------- + self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---") + df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1) self.logger.info(f" Narratives generated: {len(df_agg):,}") self.logger.info("\n Sample (first 2):") for _, row in df_agg.head(2).iterrows(): self.logger.info( - f"\n [{int(row['year'])}] {row['indicator_name'][:50]}" - f"\n -> {row['narrative'][:250]}..." + f"\n [{int(row['indicator_id'])}] {row['indicator_name'][:60]}" + f"\n -> {row['narrative'][:300]}..." ) - - # -- STEP 16: Save -- + + # -- STEP 16: Save ---------------------------------------------------------- self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---") out = df_agg[[ - "year", "indicator_id", "indicator_name", "unit", "direction", + "indicator_id", "indicator_name", "unit", "direction", "pillar_name", "framework", - "avg_value", "avg_norm_score_1_100", "performance", - "yoy_avg_value", "n_countries", "narrative", + "year_min", "year_max", "n_countries", + "avg_value_first", "avg_value_last", + "avg_norm_score_1_100", "performance", + "n_yoy_total", "n_yoy_positive", + "best_yoy_from", "best_yoy_to", + "country_worst", "country_best", + "narrative", ]].copy() - - out = out.sort_values(["year", "pillar_name", "indicator_name"]).reset_index(drop=True) - - out["year"] = out["year"].astype(int) + + out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True) + + # Cast out["indicator_id"] = out["indicator_id"].astype(int) out["indicator_name"] = out["indicator_name"].astype(str) out["unit"] = out["unit"].fillna("").astype(str) out["direction"] = out["direction"].astype(str) out["pillar_name"] = out["pillar_name"].astype(str) out["framework"] = out["framework"].astype(str) - out["avg_value"] = out["avg_value"].astype(float) - out["avg_norm_score_1_100"] = out["avg_norm_score_1_100"].astype(float) - out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") - out["yoy_avg_value"] = pd.to_numeric(out["yoy_avg_value"], errors="coerce").astype(float) + out["year_min"] = out["year_min"].astype(int) + out["year_max"] = out["year_max"].astype(int) out["n_countries"] = out["n_countries"].astype(int) + out["avg_value_first"] = pd.to_numeric(out["avg_value_first"], errors="coerce").astype(float) + out["avg_value_last"] = pd.to_numeric(out["avg_value_last"], errors="coerce").astype(float) + out["avg_norm_score_1_100"] = pd.to_numeric(out["avg_norm_score_1_100"], errors="coerce").astype(float) + out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") + out["n_yoy_total"] = pd.to_numeric(out["n_yoy_total"], errors="coerce").astype("Int64") + out["n_yoy_positive"] = pd.to_numeric(out["n_yoy_positive"], errors="coerce").astype("Int64") + out["best_yoy_from"] = pd.to_numeric(out["best_yoy_from"], errors="coerce").astype("Int64") + out["best_yoy_to"] = pd.to_numeric(out["best_yoy_to"], errors="coerce").astype("Int64") + out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string") + out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string") out["narrative"] = out["narrative"].astype(str) - + schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("avg_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("year_min", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_max", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("avg_value_first", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("avg_value_last", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), - bigquery.SchemaField("yoy_avg_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_yoy_total", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("n_yoy_positive", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("best_yoy_from", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("best_yoy_to", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("country_worst", "STRING", mode="NULLABLE"), + bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"), bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), ] - + rows_loaded = load_to_bigquery( self.client, out, "agg_narrative_indicator", layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema, ) - + log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded) self.logger.info( f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold" ) - + metadata = { "source_class" : self.__class__.__name__, "table_name" : "agg_narrative_indicator", @@ -1076,23 +1281,21 @@ class IndicatorNormAggregator: "completeness_pct" : 100.0, "config_snapshot" : json.dumps({ "source_table" : "agg_indicator_norm (in-memory df_final)", - "granularity" : "year x indicator_id (ASEAN level)", - "aggregation" : "mean across ASEAN countries", + "granularity" : "indicator_id only (all years, all ASEAN countries)", + "aggregation" : "full-period summary per indicator", "performance_threshold": _PERFORMANCE_THRESHOLD, - "yoy_column" : "yoy_avg_value", "layer" : "gold", }), "validation_metrics" : json.dumps({ "total_rows" : rows_loaded, "n_indicators": int(out["indicator_id"].nunique()), - "year_min" : int(out["year"].min()), - "year_max" : int(out["year"].max()), }), } save_etl_metadata(self.client, metadata) self.logger.info(" Metadata -> [AUDIT] etl_metadata") - + self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded + # ========================================================================= # RUN