From e00e9c569dd6f312c70128a0cd2cf85ac130696a Mon Sep 17 00:00:00 2001 From: Debby Date: Tue, 7 Apr 2026 20:04:52 +0700 Subject: [PATCH] sum indicator problem solve --- .../bigquery_aggraget_fact_selected_layer.py | 465 ++++++++++++------ 1 file changed, 321 insertions(+), 144 deletions(-) diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index 94ffb1a..67fb719 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -247,91 +247,140 @@ def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple: def _build_narrative(row: pd.Series) -> str: """ - Bangun 1 paragraf narasi ASEAN-level untuk satu baris (year x indicator_id). + Bangun 1 paragraf narasi per indikator, merangkum seluruh negara ASEAN + dan seluruh tahun yang tersedia. """ - year = int(row["year"]) - ind_name = str(row["indicator_name"]).strip() - unit = str(row["unit"]).strip() if row["unit"] else "" - direction = str(row["direction"]).strip() - pillar = str(row["pillar_name"]).strip() - framework = str(row["framework"]).strip() - avg_val = row["avg_value"] - avg_score = row["avg_norm_score_1_100"] - performance = row["performance"] - yoy = row["yoy_avg_value"] - n_countries = int(row["n_countries"]) if not pd.isna(row["n_countries"]) else 0 + ind_name = str(row["indicator_name"]).strip() + unit = str(row["unit"]).strip() if row["unit"] else "" + direction = str(row["direction"]).strip() + pillar = str(row["pillar_name"]).strip() + framework = str(row["framework"]).strip() + year_min = int(row["year_min"]) + year_max = int(row["year_max"]) + n_years = int(row["n_years"]) + n_countries = int(row["n_countries"]) + avg_score = row["avg_norm_score_1_100"] + performance = str(row["performance"]) if not pd.isna(row["performance"]) else None + first_val = row["asean_avg_first"] + last_val = row["asean_avg_last"] + cum_change = row["cumulative_change"] + best_country = str(row["best_country"]).strip() + worst_country = str(row["worst_country"]).strip() + yoy_improved = int(row["yoy_improved_count"]) if not pd.isna(row["yoy_improved_count"]) else 0 + yoy_total = int(row["yoy_total_transitions"]) if not pd.isna(row["yoy_total_transitions"]) else 0 + best_year = int(row["best_year"]) if not pd.isna(row["best_year"]) else None + worst_year = int(row["worst_year"]) if not pd.isna(row["worst_year"]) else None lower_better = _is_lower_better(direction) direction_label = ( - "lower values indicate better outcomes" if lower_better - else "higher values indicate better outcomes" + "lower values indicate better outcomes" + if lower_better else "higher values indicate better outcomes" ) - # --- Bagian 1: Nilai rata-rata ASEAN --- - val_str = _format_value(avg_val, unit) - sentence1 = ( - f"In {year}, the ASEAN regional average for {ind_name} stood at {val_str}" + year_span = ( + f"from {year_min} to {year_max}" + if year_min != year_max else f"in {year_min}" ) - if n_countries > 0: - sentence1 += ( - f", based on data from {n_countries} " - f"ASEAN member state{'s' if n_countries > 1 else ''}" + n_years_label = f"{n_years} year{'s' if n_years > 1 else ''} of data" + + # --- Bagian 1: Pembuka --- + s1 = ( + f"The {ind_name} under the {framework} framework ({pillar} pillar) " + f"was monitored {year_span} across {n_countries} ASEAN member state{'s' if n_countries > 1 else ''}, " + f"covering {n_years_label}." + ) + + # --- Bagian 2: Arah interpretasi --- + s2 = f"Since {direction_label} for this indicator, performance is evaluated accordingly." + + # --- Bagian 3: Trend kumulatif (first -> last) --- + if not pd.isna(first_val) and not pd.isna(last_val) and not pd.isna(cum_change): + first_str = _format_value(first_val, unit) + last_str = _format_value(last_val, unit) + chg_str = _format_value(abs(cum_change), unit) + + if lower_better: + is_improving = cum_change < 0 + else: + is_improving = cum_change > 0 + + moved = "declined" if cum_change < 0 else "increased" + trend_lbl = "an overall improving trend" if is_improving else "an overall deteriorating trend" + + s3 = ( + f"The ASEAN regional average {moved} from {first_str} in {year_min} " + f"to {last_str} in {year_max}, a cumulative shift of {chg_str}, " + f"indicating {trend_lbl} over the observed period." ) - sentence1 += "." + else: + s3 = "Insufficient data is available to assess the cumulative trend across the period." - # --- Bagian 2: Score dan performance --- + # --- Bagian 4: Score dan performance --- if not pd.isna(avg_score): score_str = f"{avg_score:.1f} out of 100" if performance == "Good": - perf_phrase = ( - f"The region achieved a normalized score of {score_str}, " - f"classified as Good performance meeting the 60-point threshold " - f"under the {framework} framework ({pillar} pillar)." + s4 = ( + f"The average normalized score across all years and countries stood at {score_str}, " + f"placing the region in the Good performance category and consistently above the 60-point threshold." ) elif performance == "Bad": - perf_phrase = ( - f"The region recorded a normalized score of {score_str}, " - f"classified as Bad performance falling below the 60-point threshold " - f"under the {framework} framework ({pillar} pillar)." + s4 = ( + f"The average normalized score across all years and countries stood at {score_str}, " + f"placing the region in the Bad performance category and below the 60-point threshold." ) else: - perf_phrase = ( - f"The region recorded a normalized score of {score_str} " - f"under the {framework} framework ({pillar} pillar)." - ) + s4 = f"The average normalized score across all years and countries stood at {score_str}." else: - perf_phrase = ( - f"Performance could not be assessed due to insufficient data " - f"under the {framework} framework ({pillar} pillar)." + s4 = "The normalized score could not be assessed due to insufficient data." + + # --- Bagian 5: Best dan worst country --- + if best_country and worst_country and best_country != worst_country: + s5 = ( + f"Among member states, {best_country} recorded the strongest performance throughout the period, " + f"while {worst_country} showed the weakest performance." ) + elif best_country: + s5 = f"{best_country} recorded the strongest overall performance among ASEAN member states." + else: + s5 = "" - # --- Bagian 3 & 4: Arah + YoY --- - direction_phrase = f"Since {direction_label} for this indicator" + # --- Bagian 6: Best dan worst year --- + if best_year and worst_year and best_year != worst_year: + s6 = ( + f"The best regional score was recorded in {best_year}, " + f"while {worst_year} represented the weakest year across the region." + ) + elif best_year: + s6 = f"The strongest regional performance was recorded in {best_year}." + else: + s6 = "" - if not pd.isna(yoy) and yoy != 0: - direction_word, change_desc, is_positive = _format_yoy(yoy, unit, lower_better) - if is_positive: - trend_word = "a positive trend" - tone = "reflecting improvements in regional food security performance" + # --- Bagian 7: YoY consistency --- + if yoy_total > 0: + yoy_worsened = yoy_total - yoy_improved + if yoy_improved > yoy_worsened: + consistency = "predominantly positive" + elif yoy_improved < yoy_worsened: + consistency = "predominantly negative" else: - trend_word = "a deteriorating trend" - tone = "signaling the need for greater regional attention and policy response" - yoy_phrase = ( - f"{direction_phrase}, the regional average {direction_word} {change_desc} " - f"compared to {year - 1}, reflecting {trend_word} — {tone}." - ) - elif pd.isna(yoy): - yoy_phrase = ( - f"No prior year data is available for comparison, " - f"as this is the earliest recorded year for this indicator in the dataset." + consistency = "mixed" + s7 = ( + f"Year-on-year, the region showed improvement in {yoy_improved} out of {yoy_total} " + f"transitions, reflecting a {consistency} trajectory over the period." ) else: - yoy_phrase = ( - f"{direction_phrase}, the regional average remained stable " - f"compared to {year - 1}, with no measurable change year-on-year." - ) + s7 = "" - return f"{sentence1} {perf_phrase} {yoy_phrase}" + # --- Gabungkan semua bagian --- + parts = [s1, s2, s3, s4] + if s5: + parts.append(s5) + if s6: + parts.append(s6) + if s7: + parts.append(s7) + + return " ".join(parts) # ============================================================================= @@ -947,112 +996,234 @@ class IndicatorNormAggregator: Pipeline agg_narrative_indicator yang dijalankan otomatis setelah agg_indicator_norm selesai. Memakai df_final yang sudah ada di memori, tanpa re-load dari BigQuery. + + Granularity: per indicator_id (1 baris = 1 indikator, + merangkum SELURUH negara ASEAN + SELURUH tahun yang tersedia). + Total rows = jumlah indikator unik. """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 12-16: agg_narrative_indicator") - self.logger.info(" Level : ASEAN (year x indicator_id)") + self.logger.info(" Level : per indicator_id (all ASEAN countries x all years)") self.logger.info("=" * 80) - # -- STEP 12: Agregasi ke level ASEAN -- - self.logger.info("\n--- STEP 12: AGGREGATE TO ASEAN LEVEL ---") + df = df_final.copy() + + # -- STEP 12: Agregasi per indicator_id (all countries, all years) -- + self.logger.info("\n--- STEP 12: AGGREGATE PER INDICATOR (all countries x all years) ---") + dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] - agg_dict = {col: "first" for col in dim_cols} - agg_dict["value"] = "mean" - agg_dict["norm_score_1_100"] = "mean" - agg_dict["country_id"] = "count" - df_agg = ( - df_final.groupby(["year", "indicator_id"]) - .agg(agg_dict) - .reset_index() - .rename(columns={ - "value" : "avg_value", - "norm_score_1_100": "avg_norm_score_1_100", - "country_id" : "n_countries", - }) - ) - self.logger.info(f" Rows : {len(df_agg):,}") - self.logger.info(f" Inds : {df_agg['indicator_id'].nunique()}") - self.logger.info( - f" Years : {int(df_agg['year'].min())} - {int(df_agg['year'].max())}" - ) - - # -- STEP 13: YoY avg_value per indikator -- - self.logger.info("\n--- STEP 13: COMPUTE YoY avg_value ---") - parts = [] - for ind_id, grp in df_agg.groupby("indicator_id"): - grp = grp.sort_values("year").copy() - grp["prev_avg_value"] = grp["avg_value"].shift(1) - grp["yoy_avg_value"] = np.where( - grp["avg_value"].notna() & grp["prev_avg_value"].notna(), - grp["avg_value"] - grp["prev_avg_value"], - np.nan, + # Agregat dasar + base = ( + df.groupby("indicator_id") + .agg( + indicator_name = ("indicator_name", "first"), + unit = ("unit", "first"), + direction = ("direction", "first"), + pillar_name = ("pillar_name", "first"), + framework = ("framework", "first"), + year_min = ("year", "min"), + year_max = ("year", "max"), + n_years = ("year", "nunique"), + n_countries = ("country_id", "nunique"), + avg_norm_score_1_100 = ("norm_score_1_100", "mean"), ) - grp = grp.drop(columns=["prev_avg_value"]) - parts.append(grp) - df_agg = pd.concat(parts, ignore_index=True) - self.logger.info(f" yoy_avg_value nulls: {df_agg['yoy_avg_value'].isna().sum():,}") + .reset_index() + ) - # -- STEP 14: Assign performance -- - self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---") - df_agg["performance"] = pd.NA - has_score = df_agg["avg_norm_score_1_100"].notna() - df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" - df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" - n_good = (df_agg["performance"] == "Good").sum() - n_bad = (df_agg["performance"] == "Bad").sum() + # ASEAN avg value per tahun -> ambil first year & last year value + asean_yr = ( + df.groupby(["indicator_id", "year"])["value"] + .mean() + .reset_index() + .rename(columns={"value": "avg_val_yr"}) + ) + + def _first_last(grp): + grp = grp.sort_values("year") + return pd.Series({ + "asean_avg_first": grp["avg_val_yr"].iloc[0], + "asean_avg_last" : grp["avg_val_yr"].iloc[-1], + }) + + first_last = ( + asean_yr.groupby("indicator_id") + .apply(_first_last) + .reset_index() + ) + base = base.merge(first_last, on="indicator_id", how="left") + base["cumulative_change"] = base["asean_avg_last"] - base["asean_avg_first"] + + # Best & worst country (berdasarkan avg norm_score_1_100 per negara per indikator) + country_score = ( + df.groupby(["indicator_id", "country_name"])["norm_score_1_100"] + .mean() + .reset_index() + ) + + def _best_worst_country(grp): + grp = grp.sort_values("norm_score_1_100") + return pd.Series({ + "worst_country": grp["country_name"].iloc[0], + "best_country" : grp["country_name"].iloc[-1], + }) + + bw_country = ( + country_score.groupby("indicator_id") + .apply(_best_worst_country) + .reset_index() + ) + base = base.merge(bw_country, on="indicator_id", how="left") + + # Best & worst year (berdasarkan avg norm_score_1_100 ASEAN per tahun) + yr_score = ( + df.groupby(["indicator_id", "year"])["norm_score_1_100"] + .mean() + .reset_index() + ) + + def _best_worst_year(grp): + grp = grp.sort_values("norm_score_1_100") + return pd.Series({ + "worst_year": int(grp["year"].iloc[0]), + "best_year" : int(grp["year"].iloc[-1]), + }) + + bw_year = ( + yr_score.groupby("indicator_id") + .apply(_best_worst_year) + .reset_index() + ) + base = base.merge(bw_year, on="indicator_id", how="left") + + # YoY consistency: hitung berapa transisi membaik vs memburuk + # (berdasarkan avg ASEAN value per tahun, direction-aware) + def _yoy_consistency(grp): + ind_id = grp["indicator_id"].iloc[0] + direction = base.loc[base["indicator_id"] == ind_id, "direction"].iloc[0] + lb = _is_lower_better(direction) + grp = grp.sort_values("year") + diffs = grp["avg_val_yr"].diff().dropna() + if lb: + improved = (diffs < 0).sum() + else: + improved = (diffs > 0).sum() + return pd.Series({ + "yoy_improved_count" : int(improved), + "yoy_total_transitions": int(len(diffs)), + }) + + yoy_cons = ( + asean_yr.groupby("indicator_id") + .apply(_yoy_consistency) + .reset_index() + ) + base = base.merge(yoy_cons, on="indicator_id", how="left") + + self.logger.info(f" Rows (= n_indicators): {len(base):,}") + self.logger.info(f" Years span : {int(base['year_min'].min())} - {int(base['year_max'].max())}") + + # -- STEP 13: Assign performance berdasarkan avg_norm_score_1_100 -- + self.logger.info("\n--- STEP 13: ASSIGN PERFORMANCE ---") + base["performance"] = pd.NA + has_score = base["avg_norm_score_1_100"].notna() + base.loc[has_score & (base["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" + base.loc[has_score & (base["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" + n_good = (base["performance"] == "Good").sum() + n_bad = (base["performance"] == "Bad").sum() self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}") - # -- STEP 15: Build narrative -- - self.logger.info("\n--- STEP 15: BUILD NARRATIVE ---") - df_agg["narrative"] = df_agg.apply(_build_narrative, axis=1) - self.logger.info(f" Narratives generated: {len(df_agg):,}") + # -- STEP 14: Build narrative -- + self.logger.info("\n--- STEP 14: BUILD NARRATIVE ---") + base["narrative"] = base.apply(_build_narrative, axis=1) + self.logger.info(f" Narratives generated: {len(base):,}") self.logger.info("\n Sample (first 2):") - for _, row in df_agg.head(2).iterrows(): + for _, row in base.head(2).iterrows(): self.logger.info( - f"\n [{int(row['year'])}] {row['indicator_name'][:50]}" - f"\n -> {row['narrative'][:250]}..." + f"\n [{row['indicator_name'][:55]}]" + f"\n -> {row['narrative'][:300]}..." ) - # -- STEP 16: Save -- - self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---") - out = df_agg[[ - "year", "indicator_id", "indicator_name", "unit", "direction", - "pillar_name", "framework", - "avg_value", "avg_norm_score_1_100", "performance", - "yoy_avg_value", "n_countries", "narrative", + # -- STEP 15: Save -- + self.logger.info("\n--- STEP 15: SAVE -> [Gold] agg_narrative_indicator ---") + out = base[[ + "indicator_id", + "indicator_name", + "unit", + "direction", + "pillar_name", + "framework", + "year_min", + "year_max", + "n_years", + "n_countries", + "asean_avg_first", + "asean_avg_last", + "cumulative_change", + "avg_norm_score_1_100", + "performance", + "best_country", + "worst_country", + "best_year", + "worst_year", + "yoy_improved_count", + "yoy_total_transitions", + "narrative", ]].copy() - out = out.sort_values(["year", "pillar_name", "indicator_name"]).reset_index(drop=True) + out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True) - out["year"] = out["year"].astype(int) out["indicator_id"] = out["indicator_id"].astype(int) out["indicator_name"] = out["indicator_name"].astype(str) out["unit"] = out["unit"].fillna("").astype(str) out["direction"] = out["direction"].astype(str) out["pillar_name"] = out["pillar_name"].astype(str) out["framework"] = out["framework"].astype(str) - out["avg_value"] = out["avg_value"].astype(float) + out["year_min"] = out["year_min"].astype(int) + out["year_max"] = out["year_max"].astype(int) + out["n_years"] = out["n_years"].astype(int) + out["n_countries"] = out["n_countries"].astype(int) + out["asean_avg_first"] = out["asean_avg_first"].astype(float) + out["asean_avg_last"] = out["asean_avg_last"].astype(float) + out["cumulative_change"] = out["cumulative_change"].astype(float) out["avg_norm_score_1_100"] = out["avg_norm_score_1_100"].astype(float) out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") - out["yoy_avg_value"] = pd.to_numeric(out["yoy_avg_value"], errors="coerce").astype(float) - out["n_countries"] = out["n_countries"].astype(int) + out["best_country"] = out["best_country"].astype(str) + out["worst_country"] = out["worst_country"].astype(str) + out["best_year"] = pd.to_numeric(out["best_year"], errors="coerce").astype("Int64") + out["worst_year"] = pd.to_numeric(out["worst_year"], errors="coerce").astype("Int64") + out["yoy_improved_count"] = pd.to_numeric(out["yoy_improved_count"], errors="coerce").astype("Int64") + out["yoy_total_transitions"]= pd.to_numeric(out["yoy_total_transitions"], errors="coerce").astype("Int64") out["narrative"] = out["narrative"].astype(str) + self.logger.info(f" Columns : {list(out.columns)}") + self.logger.info(f" Total rows : {len(out):,}") + self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}") + self.logger.info(f" Performance: {dict(out['performance'].value_counts())}") + schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("avg_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), - bigquery.SchemaField("yoy_avg_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year_min", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_max", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_years", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("asean_avg_first", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("asean_avg_last", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("cumulative_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), + bigquery.SchemaField("best_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("worst_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("best_year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("worst_year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("yoy_improved_count", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("yoy_total_transitions", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( @@ -1076,17 +1247,23 @@ class IndicatorNormAggregator: "completeness_pct" : 100.0, "config_snapshot" : json.dumps({ "source_table" : "agg_indicator_norm (in-memory df_final)", - "granularity" : "year x indicator_id (ASEAN level)", - "aggregation" : "mean across ASEAN countries", + "granularity" : "indicator_id (all ASEAN countries x all years)", + "aggregation" : "mean across all countries and years per indicator", "performance_threshold": _PERFORMANCE_THRESHOLD, - "yoy_column" : "yoy_avg_value", - "layer" : "gold", + "narrative_components" : [ + "year span", "n_countries", "direction interpretation", + "cumulative trend (first to last year)", + "avg_norm_score + performance", + "best/worst country", "best/worst year", + "YoY consistency count", + ], + "layer": "gold", }), "validation_metrics" : json.dumps({ "total_rows" : rows_loaded, "n_indicators": int(out["indicator_id"].nunique()), - "year_min" : int(out["year"].min()), - "year_max" : int(out["year"].max()), + "year_min" : int(out["year_min"].min()), + "year_max" : int(out["year_max"].max()), }), } save_etl_metadata(self.client, metadata)