diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index 5825b90..cd10e73 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -17,16 +17,24 @@ Framework Classification Logic: * "SDGs" untuk year >= sdgs_start_year - Indikator yang TIDAK ada dalam SDG_ONLY_KEYWORDS selalu "MDGs". +YoY Logic: + - yoy_value : selisih absolut value vs tahun sebelumnya (per indikator, negara) + - yoy_norm_value : selisih absolut norm_value vs tahun sebelumnya + +Performance Label Logic: + - performance : "Good" jika norm_score_1_100 >= 60, "Bad" jika < 60, null jika null + Output Schema (agg_indicator_norm): year, country_id, country_name, - indicator_id, indicator_name, direction, + indicator_id, indicator_name, unit, direction, pillar_id, pillar_name, framework, -- "MDGs" | "SDGs" value, -- raw value asli norm_value, -- 0-1, direction sudah diperhitungkan norm_score_1_100, -- scaled 1-100 (global per indikator) - rank_in_indicator_year, -- rank negara di dalam satu indikator & tahun - rank_in_country_year -- rank indikator di dalam satu negara & tahun + yoy_value, -- perubahan absolut value YoY + yoy_norm_value, -- perubahan absolut norm_value YoY + performance -- "Good" | "Bad" | null """ import pandas as pd @@ -85,7 +93,6 @@ SDG_ONLY_KEYWORDS: frozenset = frozenset([ _SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS) # FIES-specific keywords untuk deteksi sdgs_start_year -# (indikator yang HANYA muncul setelah SDGs era dimulai) _FIES_DETECTION_KEYWORDS: frozenset = frozenset([ "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", @@ -101,6 +108,9 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({ "positive", "higher_better", "higher_is_better", }) +# Threshold performance label +_PERFORMANCE_THRESHOLD: float = 60.0 + # ============================================================================= # PURE HELPERS @@ -133,6 +143,37 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S return pd.Series(result, index=series.index) +def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame: + """ + Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year. + + Kolom yang ditambahkan: + yoy_value : value - value_prev + yoy_norm_value : norm_value - norm_value_prev + + Baris pertama tiap grup selalu null (tidak ada tahun sebelumnya). + """ + df = df.sort_values("year").copy() + + df["value_prev"] = df["value"].shift(1) + df["norm_value_prev"] = df["norm_value"].shift(1) + + df["yoy_value"] = np.where( + df["value"].notna() & df["value_prev"].notna(), + df["value"] - df["value_prev"], + np.nan, + ) + + df["yoy_norm_value"] = np.where( + df["norm_value"].notna() & df["norm_value_prev"].notna(), + df["norm_value"] - df["norm_value_prev"], + np.nan, + ) + + df = df.drop(columns=["value_prev", "norm_value_prev"]) + return df + + # ============================================================================= # MAIN CLASS # ============================================================================= @@ -144,12 +185,15 @@ class IndicatorNormAggregator: Alur: 1. Load fact_asean_food_security_selected - 2. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) - 3. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label - 4. Hitung norm_value per indikator (direction-aware, 0-1) - 5. Scale ke 1-100 per indikator (global) - 6. Hitung rank_in_indicator_year & rank_in_country_year - 7. Simpan ke BigQuery + 2. Load dim_indicator -> ambil kolom unit + 3. Merge unit ke df + 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) + 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label + 6. Hitung norm_value per indikator (direction-aware, 0-1) + 7. Hitung YoY per (indicator_id, country_id) + 8. Scale ke 1-100 per indikator (global) + 9. Assign performance label (Good/Bad) + 10. Simpan ke BigQuery """ def __init__(self, client: bigquery.Client): @@ -158,6 +202,7 @@ class IndicatorNormAggregator: self.logger.propagate = False self.df = None + self.df_unit = None self.sdgs_start_year = None self.pipeline_start = None @@ -169,7 +214,7 @@ class IndicatorNormAggregator: } # ========================================================================= - # STEP 1: Load + # STEP 1: Load fact table # ========================================================================= def load_data(self): @@ -205,22 +250,80 @@ class IndicatorNormAggregator: ) # ========================================================================= - # STEP 2: Deteksi sdgs_start_year + # STEP 2: Load unit dari dim_indicator + # ========================================================================= + + def load_units(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 2: LOAD UNIT — dim_indicator") + self.logger.info("=" * 80) + + dim = read_from_bigquery(self.client, "dim_indicator", layer="gold") + + if "indicator_id" not in dim.columns or "unit" not in dim.columns: + raise ValueError( + f"dim_indicator harus punya kolom 'indicator_id' dan 'unit'. " + f"Kolom tersedia: {list(dim.columns)}" + ) + + self.df_unit = ( + dim[["indicator_id", "unit"]] + .drop_duplicates(subset=["indicator_id"]) + .copy() + ) + self.df_unit["indicator_id"] = self.df_unit["indicator_id"].astype(int) + self.df_unit["unit"] = self.df_unit["unit"].fillna("").astype(str) + + n_missing_unit = self.df_unit["unit"].eq("").sum() + self.logger.info(f" dim_indicator rows (unique indicator_id): {len(self.df_unit):,}") + self.logger.info(f" Indicator dengan unit kosong : {n_missing_unit}") + + fact_ids = set(self.df["indicator_id"].astype(int).unique()) + dim_ids = set(self.df_unit["indicator_id"].unique()) + orphan = fact_ids - dim_ids + if orphan: + self.logger.warning( + f" [WARNING] {len(orphan)} indicator_id di fact tidak ditemukan di " + f"dim_indicator (unit akan diisi ''): {sorted(orphan)}" + ) + + # ========================================================================= + # STEP 3: Merge unit ke df + # ========================================================================= + + def _merge_unit(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 3: MERGE UNIT -> fact df") + self.logger.info("=" * 80) + + before = len(self.df) + self.df = self.df.merge(self.df_unit, on="indicator_id", how="left") + self.df["unit"] = self.df["unit"].fillna("").astype(str) + after = len(self.df) + + assert before == after, ( + f"Row count berubah setelah merge unit: {before} -> {after}" + ) + + n_empty = self.df["unit"].eq("").sum() + self.logger.info( + f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}" + ) + unique_units = self.df["unit"].value_counts().to_dict() + self.logger.info(" Distribusi unit (top 10):") + for u, cnt in list(unique_units.items())[:10]: + label = repr(u) if u == "" else u + self.logger.info(f" {label:<30}: {cnt:,} rows") + + # ========================================================================= + # STEP 4: Deteksi sdgs_start_year # ========================================================================= def _detect_sdgs_start_year(self) -> int: - """ - sdgs_start_year = tahun pertama FIES hadir di data. - FIES = indikator yang ada di _FIES_DETECTION_LOWER. - - Fallback ke metode gap-terbesar pada min_year distribusi per indikator - jika FIES tidak ditemukan. - """ self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 2: DETECT sdgs_start_year (first FIES year)") + self.logger.info("STEP 4: DETECT sdgs_start_year (first FIES year)") self.logger.info("=" * 80) - # Metode 1: Explicit FIES detection fies_rows = self.df[ self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER) ] @@ -234,7 +337,6 @@ class IndicatorNormAggregator: self.logger.info(f" - {nm[:60]} (first year: {min_y})") return sdgs_start - # Fallback: gap-terbesar self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar") ind_min_year = ( self.df.groupby("indicator_id")["year"] @@ -262,58 +364,30 @@ class IndicatorNormAggregator: return sdgs_start # ========================================================================= - # STEP 3: Assign framework + # STEP 5: Assign framework # ========================================================================= def _assign_framework(self): - """ - Tambahkan kolom 'framework' ke self.df. - - Aturan per baris: - - Indikator TIDAK di SDG_ONLY_KEYWORDS: - framework = "MDGs" (selalu, semua tahun) - - - Indikator DI SDG_ONLY_KEYWORDS: - year < sdgs_start_year -> framework = "MDGs" - year >= sdgs_start_year -> framework = "SDGs" - - Contoh dual-label (indicator "prevalence of undernourishment"): - Jika data ada dari 2013 dan sdgs_start_year = 2019: - - Baris 2013-2018: framework = "MDGs" (masuk era MDGs) - - Baris 2019-dst : framework = "SDGs" (masuk era SDGs) - Sehingga indikator ini muncul di kedua framework tanpa duplikasi baris. - - Contoh FIES-only (indicator "prevalence of severe food insecurity"): - Data baru ada mulai 2019 (= sdgs_start_year): - - Semua baris: framework = "SDGs" - """ self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 3: ASSIGN FRAMEWORK PER BARIS") + self.logger.info("STEP 5: ASSIGN FRAMEWORK PER BARIS") self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}") self.logger.info("=" * 80) df = self.df.copy() - # Flag apakah indikator ada di SDG_ONLY_KEYWORDS df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER) + df["framework"] = "MDGs" - # Default semua MDGs - df["framework"] = "MDGs" - - # SDG_ONLY + year >= sdgs_start_year -> SDGs mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year) df.loc[mask_sdgs, "framework"] = "SDGs" - # Drop helper column df = df.drop(columns=["_is_sdg_kw"]) - # ---- Logging ---- fw_dist = df["framework"].value_counts() self.logger.info("\n Framework distribution (rows):") for fw, cnt in fw_dist.items(): self.logger.info(f" {fw:<6}: {cnt:,} rows") - # Cek berapa indikator punya dual-framework dual = ( df.groupby("indicator_id")["framework"] .nunique() @@ -336,29 +410,15 @@ class IndicatorNormAggregator: f" SDGs years: {sdgs_yrs}" ) - self.logger.info( - f"\n Indikator SDGs only (semua tahun = SDGs): " - f"{len(dual[(dual['n_frameworks'] == 1)].merge(df[df['framework'] == 'SDGs'][['indicator_id']].drop_duplicates(), on='indicator_id'))}" - ) - self.df = df # ========================================================================= - # STEP 4: Hitung norm_value per indikator (direction-aware) + # STEP 6: Hitung norm_value per indikator (direction-aware) # ========================================================================= def _compute_norm_values(self) -> pd.DataFrame: - """ - Normalisasi per indikator secara global (semua tahun & negara): - norm_value = (raw - min) / (max - min) [higher_better] - norm_value = 1 - (raw - min) / (max - min) [lower_better] - - Normalisasi dilakukan satu kali per indicator_id, - mencakup SEMUA baris (MDGs + SDGs dari indikator yang sama) - agar skor konsisten antar framework. - """ self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 4: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)") + self.logger.info("STEP 6: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)") self.logger.info("=" * 80) df = self.df.copy() @@ -400,40 +460,57 @@ class IndicatorNormAggregator: df_normed = pd.concat(norm_parts, ignore_index=True) - n_ind_computed = df_normed["indicator_id"].nunique() - self.logger.info(f" norm_value computed: {n_ind_computed} indicators") + self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators") self.logger.info( f" norm_value range : " f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}" ) - self.logger.info( - f" norm_value nulls : {df_normed['norm_value'].isna().sum()}" - ) + self.logger.info(f" norm_value nulls : {df_normed['norm_value'].isna().sum()}") return df_normed # ========================================================================= - # STEP 5: Scale ke 1-100, hitung rank + # STEP 7: Hitung YoY per (indicator_id, country_id) # ========================================================================= - def _compute_scores_and_ranks(self, df: pd.DataFrame) -> pd.DataFrame: - """ - norm_score_1_100: - Scale norm_value ke 1-100 secara global PER INDIKATOR - (semua tahun & negara dalam satu indikator di-scale bersama). - - rank_in_indicator_year: - Rank negara dalam satu (indicator_id, year). - rank=1 -> negara dengan norm_score terbaik untuk indikator tsb di tahun tsb. - - rank_in_country_year: - Rank indikator dalam satu (country_id, year). - rank=1 -> indikator dengan norm_score terbaik untuk negara tsb di tahun tsb. - """ + def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame: self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 5: SCALE TO 1-100 & COMPUTE RANKS") + self.logger.info("STEP 7: COMPUTE YoY COLUMNS (per indicator, per country)") + self.logger.info("=" * 80) + + parts = [] + groups = df.groupby(["indicator_id", "country_id"], sort=False) + self.logger.info(f" Processing {groups.ngroups:,} (indicator x country) groups...") + + for (ind_id, country_id), grp in groups: + parts.append(_compute_yoy(grp)) + + df_out = pd.concat(parts, ignore_index=True) + + self.logger.info( + f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}" + ) + self.logger.info( + f" yoy_value range : " + f"{df_out['yoy_value'].min():.4f} - {df_out['yoy_value'].max():.4f}" + ) + self.logger.info( + f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}" + ) + self.logger.info( + f" yoy_norm_value range: " + f"{df_out['yoy_norm_value'].min():.4f} - {df_out['yoy_norm_value'].max():.4f}" + ) + return df_out + + # ========================================================================= + # STEP 8: Scale ke 1-100 + # ========================================================================= + + def _compute_scores(self, df: pd.DataFrame) -> pd.DataFrame: + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 8: SCALE TO 1-100") self.logger.info("=" * 80) - # Scale per indikator score_parts = [] for ind_id, grp in df.groupby("indicator_id"): grp = grp.copy() @@ -441,41 +518,55 @@ class IndicatorNormAggregator: score_parts.append(grp) df = pd.concat(score_parts, ignore_index=True) - # rank_in_indicator_year: rank negara per (indicator, year) - df["rank_in_indicator_year"] = ( - df.groupby(["indicator_id", "year"])["norm_score_1_100"] - .rank(method="min", ascending=False) - .astype("Int64") - ) - - # rank_in_country_year: rank indikator per (country, year) - df["rank_in_country_year"] = ( - df.groupby(["country_id", "year"])["norm_score_1_100"] - .rank(method="min", ascending=False) - .astype("Int64") - ) - self.logger.info( - f" norm_score_1_100 range : " + f" norm_score_1_100 range: " f"{df['norm_score_1_100'].min():.2f} - {df['norm_score_1_100'].max():.2f}" ) - self.logger.info( - f" rank_in_indicator_year max: {df['rank_in_indicator_year'].max()}" - ) - self.logger.info( - f" rank_in_country_year max : {df['rank_in_country_year'].max()}" - ) return df # ========================================================================= - # STEP 6: Save to BigQuery + # STEP 9: Assign performance label + # ========================================================================= + + def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame: + """ + performance = "Good" jika norm_score_1_100 >= 60 + = "Bad" jika norm_score_1_100 < 60 + = null jika norm_score_1_100 null + """ + self.logger.info("\n" + "=" * 80) + self.logger.info( + f"STEP 9: ASSIGN PERFORMANCE LABEL " + f"(threshold >= {_PERFORMANCE_THRESHOLD} -> Good)" + ) + self.logger.info("=" * 80) + + df = df.copy() + df["performance"] = pd.NA + + has_score = df["norm_score_1_100"].notna() + df.loc[has_score & (df["norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" + df.loc[has_score & (df["norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" + + n_good = (df["performance"] == "Good").sum() + n_bad = (df["performance"] == "Bad").sum() + n_null = df["performance"].isna().sum() + total = len(df) + + self.logger.info(f" Good : {n_good:,} ({n_good/total*100:.1f}%)") + self.logger.info(f" Bad : {n_bad:,} ({n_bad/total*100:.1f}%)") + self.logger.info(f" Null : {n_null:,} ({n_null/total*100:.1f}%)") + return df + + # ========================================================================= + # STEP 10: Save to BigQuery # ========================================================================= def _save(self, df: pd.DataFrame) -> int: table_name = "agg_indicator_norm" self.logger.info("\n" + "=" * 80) - self.logger.info(f"STEP 6: SAVE -> [Gold] {table_name}") + self.logger.info(f"STEP 10: SAVE -> [Gold] {table_name}") self.logger.info("=" * 80) out = df[[ @@ -484,6 +575,7 @@ class IndicatorNormAggregator: "country_name", "indicator_id", "indicator_name", + "unit", "direction", "pillar_id", "pillar_name", @@ -491,8 +583,9 @@ class IndicatorNormAggregator: "value", "norm_value", "norm_score_1_100", - "rank_in_indicator_year", - "rank_in_country_year", + "yoy_value", + "yoy_norm_value", + "performance", ]].copy() out = out.sort_values( @@ -505,6 +598,7 @@ class IndicatorNormAggregator: out["country_name"] = out["country_name"].astype(str) out["indicator_id"] = out["indicator_id"].astype(int) out["indicator_name"] = out["indicator_name"].astype(str) + out["unit"] = out["unit"].astype(str) out["direction"] = out["direction"].astype(str) out["pillar_id"] = out["pillar_id"].astype(int) out["pillar_name"] = out["pillar_name"].astype(str) @@ -512,12 +606,9 @@ class IndicatorNormAggregator: out["value"] = out["value"].astype(float) out["norm_value"] = out["norm_value"].astype(float) out["norm_score_1_100"] = out["norm_score_1_100"].astype(float) - out["rank_in_indicator_year"] = pd.to_numeric( - out["rank_in_indicator_year"], errors="coerce" - ).astype("Int64") - out["rank_in_country_year"] = pd.to_numeric( - out["rank_in_country_year"], errors="coerce" - ).astype("Int64") + out["yoy_value"] = pd.to_numeric(out["yoy_value"], errors="coerce").astype(float) + out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float) + out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") self.logger.info(f" Columns : {list(out.columns)}") self.logger.info(f" Total rows : {len(out):,}") @@ -525,22 +616,25 @@ class IndicatorNormAggregator: self.logger.info(f" Indicators : {out['indicator_id'].nunique()}") self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}") self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}") + self.logger.info(f" Performance: {dict(out['performance'].value_counts())}") schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("rank_in_indicator_year", "INTEGER", mode="NULLABLE"), - bigquery.SchemaField("rank_in_country_year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_norm_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( @@ -561,12 +655,15 @@ class IndicatorNormAggregator: "rows_loaded" : rows_loaded, "completeness_pct" : 100.0, "config_snapshot" : json.dumps({ - "sdgs_start_year" : self.sdgs_start_year, - "sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS), - "layer" : "gold", - "normalization" : "per_indicator_global_minmax", - "direction_handling" : "lower_better_inverted", - "framework_logic" : ( + "sdgs_start_year" : self.sdgs_start_year, + "sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS), + "layer" : "gold", + "normalization" : "per_indicator_global_minmax", + "direction_handling" : "lower_better_inverted", + "yoy_columns" : ["yoy_value", "yoy_norm_value"], + "performance_threshold": _PERFORMANCE_THRESHOLD, + "unit_source" : "dim_indicator", + "framework_logic" : ( "SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, " "SDGs if year >= sdgs_start_year. " "Non-SDG_ONLY: always MDGs." @@ -584,15 +681,14 @@ class IndicatorNormAggregator: return rows_loaded # ========================================================================= - # STEP 7: Summary log + # STEP 11: Summary log # ========================================================================= def _log_summary(self, df: pd.DataFrame): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 7: SUMMARY") + self.logger.info("STEP 11: SUMMARY") self.logger.info("=" * 80) - # Per framework & year summary = ( df.groupby(["framework", "year"]) .agg( @@ -613,9 +709,27 @@ class IndicatorNormAggregator: f"{r['avg_score']:.2f}" ) - # Top 5 & Bottom 5 indikator (rata-rata norm_score_1_100) + # Performance summary per framework + self.logger.info("\n Performance summary per Framework:") + perf_fw = ( + df[df["performance"].notna()] + .groupby(["framework", "performance"]) + .size() + .reset_index(name="count") + ) + for fw in perf_fw["framework"].unique(): + sub = perf_fw[perf_fw["framework"] == fw] + total = sub["count"].sum() + self.logger.info(f" [{fw}]") + for _, r in sub.iterrows(): + self.logger.info( + f" {r['performance']:<6}: {int(r['count']):,} " + f"({r['count']/total*100:.1f}%)" + ) + + # Top 5 & Bottom 5 indikator ind_avg = ( - df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"]) + df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"]) ["norm_score_1_100"].mean() .reset_index() .sort_values("norm_score_1_100", ascending=False) @@ -625,18 +739,20 @@ class IndicatorNormAggregator: "\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):" ) for _, r in ind_avg.head(5).iterrows(): - tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" + tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" + unit = f"[{r['unit']}]" if r["unit"] else "" self.logger.info( - f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} " - f"{r['norm_score_1_100']:.2f} {tag}" + f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} " + f"{r['norm_score_1_100']:.2f} {tag} {unit}" ) self.logger.info("\n BOTTOM 5 Indicators:") for _, r in ind_avg.tail(5).iterrows(): - tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" + tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" + unit = f"[{r['unit']}]" if r["unit"] else "" self.logger.info( - f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} " - f"{r['norm_score_1_100']:.2f} {tag}" + f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} " + f"{r['norm_score_1_100']:.2f} {tag} {unit}" ) # Indikator per pillar @@ -662,14 +778,19 @@ class IndicatorNormAggregator: self.logger.info("\n" + "=" * 80) self.logger.info("INDICATOR NORM AGGREGATION") self.logger.info(" Source : fact_asean_food_security_selected") + self.logger.info(" Dim : dim_indicator (unit)") self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold") self.logger.info("=" * 80) self.load_data() + self.load_units() + self._merge_unit() self.sdgs_start_year = self._detect_sdgs_start_year() self._assign_framework() - df_normed = self._compute_norm_values() - df_final = self._compute_scores_and_ranks(df_normed) + df_normed = self._compute_norm_values() + df_yoy = self._compute_yoy_columns(df_normed) + df_scored = self._compute_scores(df_yoy) + df_final = self._assign_performance(df_scored) rows_loaded = self._save(df_final) self.pipeline_metadata["rows_loaded"] = rows_loaded self._log_summary(df_final) @@ -717,6 +838,7 @@ if __name__ == "__main__": print("=" * 80) print("INDICATOR NORM AGGREGATION -> fs_asean_gold") print(" Source : fact_asean_food_security_selected") + print(" Dim : dim_indicator (unit)") print(" Output : agg_indicator_norm") print("=" * 80)