finish etl code

This commit is contained in:
Debby
2026-04-06 16:37:05 +07:00
parent 5313039b50
commit cebb6b88eb

View File

@@ -17,16 +17,24 @@ Framework Classification Logic:
* "SDGs" untuk year >= sdgs_start_year
- Indikator yang TIDAK ada dalam SDG_ONLY_KEYWORDS selalu "MDGs".
YoY Logic:
- yoy_value : selisih absolut value vs tahun sebelumnya (per indikator, negara)
- yoy_norm_value : selisih absolut norm_value vs tahun sebelumnya
Performance Label Logic:
- performance : "Good" jika norm_score_1_100 >= 60, "Bad" jika < 60, null jika null
Output Schema (agg_indicator_norm):
year, country_id, country_name,
indicator_id, indicator_name, direction,
indicator_id, indicator_name, unit, direction,
pillar_id, pillar_name,
framework, -- "MDGs" | "SDGs"
value, -- raw value asli
norm_value, -- 0-1, direction sudah diperhitungkan
norm_score_1_100, -- scaled 1-100 (global per indikator)
rank_in_indicator_year, -- rank negara di dalam satu indikator & tahun
rank_in_country_year -- rank indikator di dalam satu negara & tahun
yoy_value, -- perubahan absolut value YoY
yoy_norm_value, -- perubahan absolut norm_value YoY
performance -- "Good" | "Bad" | null
"""
import pandas as pd
@@ -85,7 +93,6 @@ SDG_ONLY_KEYWORDS: frozenset = frozenset([
_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS)
# FIES-specific keywords untuk deteksi sdgs_start_year
# (indikator yang HANYA muncul setelah SDGs era dimulai)
_FIES_DETECTION_KEYWORDS: frozenset = frozenset([
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
@@ -101,6 +108,9 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({
"positive", "higher_better", "higher_is_better",
})
# Threshold performance label
_PERFORMANCE_THRESHOLD: float = 60.0
# =============================================================================
# PURE HELPERS
@@ -133,6 +143,37 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S
return pd.Series(result, index=series.index)
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
"""
Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year.
Kolom yang ditambahkan:
yoy_value : value - value_prev
yoy_norm_value : norm_value - norm_value_prev
Baris pertama tiap grup selalu null (tidak ada tahun sebelumnya).
"""
df = df.sort_values("year").copy()
df["value_prev"] = df["value"].shift(1)
df["norm_value_prev"] = df["norm_value"].shift(1)
df["yoy_value"] = np.where(
df["value"].notna() & df["value_prev"].notna(),
df["value"] - df["value_prev"],
np.nan,
)
df["yoy_norm_value"] = np.where(
df["norm_value"].notna() & df["norm_value_prev"].notna(),
df["norm_value"] - df["norm_value_prev"],
np.nan,
)
df = df.drop(columns=["value_prev", "norm_value_prev"])
return df
# =============================================================================
# MAIN CLASS
# =============================================================================
@@ -144,12 +185,15 @@ class IndicatorNormAggregator:
Alur:
1. Load fact_asean_food_security_selected
2. Deteksi sdgs_start_year (tahun pertama FIES hadir di data)
3. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label
4. Hitung norm_value per indikator (direction-aware, 0-1)
5. Scale ke 1-100 per indikator (global)
6. Hitung rank_in_indicator_year & rank_in_country_year
7. Simpan ke BigQuery
2. Load dim_indicator -> ambil kolom unit
3. Merge unit ke df
4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data)
5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label
6. Hitung norm_value per indikator (direction-aware, 0-1)
7. Hitung YoY per (indicator_id, country_id)
8. Scale ke 1-100 per indikator (global)
9. Assign performance label (Good/Bad)
10. Simpan ke BigQuery
"""
def __init__(self, client: bigquery.Client):
@@ -158,6 +202,7 @@ class IndicatorNormAggregator:
self.logger.propagate = False
self.df = None
self.df_unit = None
self.sdgs_start_year = None
self.pipeline_start = None
@@ -169,7 +214,7 @@ class IndicatorNormAggregator:
}
# =========================================================================
# STEP 1: Load
# STEP 1: Load fact table
# =========================================================================
def load_data(self):
@@ -205,22 +250,80 @@ class IndicatorNormAggregator:
)
# =========================================================================
# STEP 2: Deteksi sdgs_start_year
# STEP 2: Load unit dari dim_indicator
# =========================================================================
def load_units(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: LOAD UNIT — dim_indicator")
self.logger.info("=" * 80)
dim = read_from_bigquery(self.client, "dim_indicator", layer="gold")
if "indicator_id" not in dim.columns or "unit" not in dim.columns:
raise ValueError(
f"dim_indicator harus punya kolom 'indicator_id' dan 'unit'. "
f"Kolom tersedia: {list(dim.columns)}"
)
self.df_unit = (
dim[["indicator_id", "unit"]]
.drop_duplicates(subset=["indicator_id"])
.copy()
)
self.df_unit["indicator_id"] = self.df_unit["indicator_id"].astype(int)
self.df_unit["unit"] = self.df_unit["unit"].fillna("").astype(str)
n_missing_unit = self.df_unit["unit"].eq("").sum()
self.logger.info(f" dim_indicator rows (unique indicator_id): {len(self.df_unit):,}")
self.logger.info(f" Indicator dengan unit kosong : {n_missing_unit}")
fact_ids = set(self.df["indicator_id"].astype(int).unique())
dim_ids = set(self.df_unit["indicator_id"].unique())
orphan = fact_ids - dim_ids
if orphan:
self.logger.warning(
f" [WARNING] {len(orphan)} indicator_id di fact tidak ditemukan di "
f"dim_indicator (unit akan diisi ''): {sorted(orphan)}"
)
# =========================================================================
# STEP 3: Merge unit ke df
# =========================================================================
def _merge_unit(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: MERGE UNIT -> fact df")
self.logger.info("=" * 80)
before = len(self.df)
self.df = self.df.merge(self.df_unit, on="indicator_id", how="left")
self.df["unit"] = self.df["unit"].fillna("").astype(str)
after = len(self.df)
assert before == after, (
f"Row count berubah setelah merge unit: {before} -> {after}"
)
n_empty = self.df["unit"].eq("").sum()
self.logger.info(
f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}"
)
unique_units = self.df["unit"].value_counts().to_dict()
self.logger.info(" Distribusi unit (top 10):")
for u, cnt in list(unique_units.items())[:10]:
label = repr(u) if u == "" else u
self.logger.info(f" {label:<30}: {cnt:,} rows")
# =========================================================================
# STEP 4: Deteksi sdgs_start_year
# =========================================================================
def _detect_sdgs_start_year(self) -> int:
"""
sdgs_start_year = tahun pertama FIES hadir di data.
FIES = indikator yang ada di _FIES_DETECTION_LOWER.
Fallback ke metode gap-terbesar pada min_year distribusi per indikator
jika FIES tidak ditemukan.
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETECT sdgs_start_year (first FIES year)")
self.logger.info("STEP 4: DETECT sdgs_start_year (first FIES year)")
self.logger.info("=" * 80)
# Metode 1: Explicit FIES detection
fies_rows = self.df[
self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER)
]
@@ -234,7 +337,6 @@ class IndicatorNormAggregator:
self.logger.info(f" - {nm[:60]} (first year: {min_y})")
return sdgs_start
# Fallback: gap-terbesar
self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar")
ind_min_year = (
self.df.groupby("indicator_id")["year"]
@@ -262,58 +364,30 @@ class IndicatorNormAggregator:
return sdgs_start
# =========================================================================
# STEP 3: Assign framework
# STEP 5: Assign framework
# =========================================================================
def _assign_framework(self):
"""
Tambahkan kolom 'framework' ke self.df.
Aturan per baris:
- Indikator TIDAK di SDG_ONLY_KEYWORDS:
framework = "MDGs" (selalu, semua tahun)
- Indikator DI SDG_ONLY_KEYWORDS:
year < sdgs_start_year -> framework = "MDGs"
year >= sdgs_start_year -> framework = "SDGs"
Contoh dual-label (indicator "prevalence of undernourishment"):
Jika data ada dari 2013 dan sdgs_start_year = 2019:
- Baris 2013-2018: framework = "MDGs" (masuk era MDGs)
- Baris 2019-dst : framework = "SDGs" (masuk era SDGs)
Sehingga indikator ini muncul di kedua framework tanpa duplikasi baris.
Contoh FIES-only (indicator "prevalence of severe food insecurity"):
Data baru ada mulai 2019 (= sdgs_start_year):
- Semua baris: framework = "SDGs"
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: ASSIGN FRAMEWORK PER BARIS")
self.logger.info("STEP 5: ASSIGN FRAMEWORK PER BARIS")
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
self.logger.info("=" * 80)
df = self.df.copy()
# Flag apakah indikator ada di SDG_ONLY_KEYWORDS
df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER)
df["framework"] = "MDGs"
# Default semua MDGs
df["framework"] = "MDGs"
# SDG_ONLY + year >= sdgs_start_year -> SDGs
mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year)
df.loc[mask_sdgs, "framework"] = "SDGs"
# Drop helper column
df = df.drop(columns=["_is_sdg_kw"])
# ---- Logging ----
fw_dist = df["framework"].value_counts()
self.logger.info("\n Framework distribution (rows):")
for fw, cnt in fw_dist.items():
self.logger.info(f" {fw:<6}: {cnt:,} rows")
# Cek berapa indikator punya dual-framework
dual = (
df.groupby("indicator_id")["framework"]
.nunique()
@@ -336,29 +410,15 @@ class IndicatorNormAggregator:
f" SDGs years: {sdgs_yrs}"
)
self.logger.info(
f"\n Indikator SDGs only (semua tahun = SDGs): "
f"{len(dual[(dual['n_frameworks'] == 1)].merge(df[df['framework'] == 'SDGs'][['indicator_id']].drop_duplicates(), on='indicator_id'))}"
)
self.df = df
# =========================================================================
# STEP 4: Hitung norm_value per indikator (direction-aware)
# STEP 6: Hitung norm_value per indikator (direction-aware)
# =========================================================================
def _compute_norm_values(self) -> pd.DataFrame:
"""
Normalisasi per indikator secara global (semua tahun & negara):
norm_value = (raw - min) / (max - min) [higher_better]
norm_value = 1 - (raw - min) / (max - min) [lower_better]
Normalisasi dilakukan satu kali per indicator_id,
mencakup SEMUA baris (MDGs + SDGs dari indikator yang sama)
agar skor konsisten antar framework.
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)")
self.logger.info("STEP 6: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)")
self.logger.info("=" * 80)
df = self.df.copy()
@@ -400,40 +460,57 @@ class IndicatorNormAggregator:
df_normed = pd.concat(norm_parts, ignore_index=True)
n_ind_computed = df_normed["indicator_id"].nunique()
self.logger.info(f" norm_value computed: {n_ind_computed} indicators")
self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators")
self.logger.info(
f" norm_value range : "
f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}"
)
self.logger.info(
f" norm_value nulls : {df_normed['norm_value'].isna().sum()}"
)
self.logger.info(f" norm_value nulls : {df_normed['norm_value'].isna().sum()}")
return df_normed
# =========================================================================
# STEP 5: Scale ke 1-100, hitung rank
# STEP 7: Hitung YoY per (indicator_id, country_id)
# =========================================================================
def _compute_scores_and_ranks(self, df: pd.DataFrame) -> pd.DataFrame:
"""
norm_score_1_100:
Scale norm_value ke 1-100 secara global PER INDIKATOR
(semua tahun & negara dalam satu indikator di-scale bersama).
rank_in_indicator_year:
Rank negara dalam satu (indicator_id, year).
rank=1 -> negara dengan norm_score terbaik untuk indikator tsb di tahun tsb.
rank_in_country_year:
Rank indikator dalam satu (country_id, year).
rank=1 -> indikator dengan norm_score terbaik untuk negara tsb di tahun tsb.
"""
def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame:
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: SCALE TO 1-100 & COMPUTE RANKS")
self.logger.info("STEP 7: COMPUTE YoY COLUMNS (per indicator, per country)")
self.logger.info("=" * 80)
parts = []
groups = df.groupby(["indicator_id", "country_id"], sort=False)
self.logger.info(f" Processing {groups.ngroups:,} (indicator x country) groups...")
for (ind_id, country_id), grp in groups:
parts.append(_compute_yoy(grp))
df_out = pd.concat(parts, ignore_index=True)
self.logger.info(
f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}"
)
self.logger.info(
f" yoy_value range : "
f"{df_out['yoy_value'].min():.4f} - {df_out['yoy_value'].max():.4f}"
)
self.logger.info(
f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}"
)
self.logger.info(
f" yoy_norm_value range: "
f"{df_out['yoy_norm_value'].min():.4f} - {df_out['yoy_norm_value'].max():.4f}"
)
return df_out
# =========================================================================
# STEP 8: Scale ke 1-100
# =========================================================================
def _compute_scores(self, df: pd.DataFrame) -> pd.DataFrame:
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: SCALE TO 1-100")
self.logger.info("=" * 80)
# Scale per indikator
score_parts = []
for ind_id, grp in df.groupby("indicator_id"):
grp = grp.copy()
@@ -441,41 +518,55 @@ class IndicatorNormAggregator:
score_parts.append(grp)
df = pd.concat(score_parts, ignore_index=True)
# rank_in_indicator_year: rank negara per (indicator, year)
df["rank_in_indicator_year"] = (
df.groupby(["indicator_id", "year"])["norm_score_1_100"]
.rank(method="min", ascending=False)
.astype("Int64")
)
# rank_in_country_year: rank indikator per (country, year)
df["rank_in_country_year"] = (
df.groupby(["country_id", "year"])["norm_score_1_100"]
.rank(method="min", ascending=False)
.astype("Int64")
)
self.logger.info(
f" norm_score_1_100 range : "
f" norm_score_1_100 range: "
f"{df['norm_score_1_100'].min():.2f} - {df['norm_score_1_100'].max():.2f}"
)
self.logger.info(
f" rank_in_indicator_year max: {df['rank_in_indicator_year'].max()}"
)
self.logger.info(
f" rank_in_country_year max : {df['rank_in_country_year'].max()}"
)
return df
# =========================================================================
# STEP 6: Save to BigQuery
# STEP 9: Assign performance label
# =========================================================================
def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame:
"""
performance = "Good" jika norm_score_1_100 >= 60
= "Bad" jika norm_score_1_100 < 60
= null jika norm_score_1_100 null
"""
self.logger.info("\n" + "=" * 80)
self.logger.info(
f"STEP 9: ASSIGN PERFORMANCE LABEL "
f"(threshold >= {_PERFORMANCE_THRESHOLD} -> Good)"
)
self.logger.info("=" * 80)
df = df.copy()
df["performance"] = pd.NA
has_score = df["norm_score_1_100"].notna()
df.loc[has_score & (df["norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
df.loc[has_score & (df["norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
n_good = (df["performance"] == "Good").sum()
n_bad = (df["performance"] == "Bad").sum()
n_null = df["performance"].isna().sum()
total = len(df)
self.logger.info(f" Good : {n_good:,} ({n_good/total*100:.1f}%)")
self.logger.info(f" Bad : {n_bad:,} ({n_bad/total*100:.1f}%)")
self.logger.info(f" Null : {n_null:,} ({n_null/total*100:.1f}%)")
return df
# =========================================================================
# STEP 10: Save to BigQuery
# =========================================================================
def _save(self, df: pd.DataFrame) -> int:
table_name = "agg_indicator_norm"
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 6: SAVE -> [Gold] {table_name}")
self.logger.info(f"STEP 10: SAVE -> [Gold] {table_name}")
self.logger.info("=" * 80)
out = df[[
@@ -484,6 +575,7 @@ class IndicatorNormAggregator:
"country_name",
"indicator_id",
"indicator_name",
"unit",
"direction",
"pillar_id",
"pillar_name",
@@ -491,8 +583,9 @@ class IndicatorNormAggregator:
"value",
"norm_value",
"norm_score_1_100",
"rank_in_indicator_year",
"rank_in_country_year",
"yoy_value",
"yoy_norm_value",
"performance",
]].copy()
out = out.sort_values(
@@ -505,6 +598,7 @@ class IndicatorNormAggregator:
out["country_name"] = out["country_name"].astype(str)
out["indicator_id"] = out["indicator_id"].astype(int)
out["indicator_name"] = out["indicator_name"].astype(str)
out["unit"] = out["unit"].astype(str)
out["direction"] = out["direction"].astype(str)
out["pillar_id"] = out["pillar_id"].astype(int)
out["pillar_name"] = out["pillar_name"].astype(str)
@@ -512,12 +606,9 @@ class IndicatorNormAggregator:
out["value"] = out["value"].astype(float)
out["norm_value"] = out["norm_value"].astype(float)
out["norm_score_1_100"] = out["norm_score_1_100"].astype(float)
out["rank_in_indicator_year"] = pd.to_numeric(
out["rank_in_indicator_year"], errors="coerce"
).astype("Int64")
out["rank_in_country_year"] = pd.to_numeric(
out["rank_in_country_year"], errors="coerce"
).astype("Int64")
out["yoy_value"] = pd.to_numeric(out["yoy_value"], errors="coerce").astype(float)
out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float)
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
self.logger.info(f" Columns : {list(out.columns)}")
self.logger.info(f" Total rows : {len(out):,}")
@@ -525,22 +616,25 @@ class IndicatorNormAggregator:
self.logger.info(f" Indicators : {out['indicator_id'].nunique()}")
self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}")
self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}")
self.logger.info(f" Performance: {dict(out['performance'].value_counts())}")
schema = [
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("rank_in_indicator_year", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("rank_in_country_year", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_norm_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
@@ -561,12 +655,15 @@ class IndicatorNormAggregator:
"rows_loaded" : rows_loaded,
"completeness_pct" : 100.0,
"config_snapshot" : json.dumps({
"sdgs_start_year" : self.sdgs_start_year,
"sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS),
"layer" : "gold",
"normalization" : "per_indicator_global_minmax",
"direction_handling" : "lower_better_inverted",
"framework_logic" : (
"sdgs_start_year" : self.sdgs_start_year,
"sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS),
"layer" : "gold",
"normalization" : "per_indicator_global_minmax",
"direction_handling" : "lower_better_inverted",
"yoy_columns" : ["yoy_value", "yoy_norm_value"],
"performance_threshold": _PERFORMANCE_THRESHOLD,
"unit_source" : "dim_indicator",
"framework_logic" : (
"SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, "
"SDGs if year >= sdgs_start_year. "
"Non-SDG_ONLY: always MDGs."
@@ -584,15 +681,14 @@ class IndicatorNormAggregator:
return rows_loaded
# =========================================================================
# STEP 7: Summary log
# STEP 11: Summary log
# =========================================================================
def _log_summary(self, df: pd.DataFrame):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: SUMMARY")
self.logger.info("STEP 11: SUMMARY")
self.logger.info("=" * 80)
# Per framework & year
summary = (
df.groupby(["framework", "year"])
.agg(
@@ -613,9 +709,27 @@ class IndicatorNormAggregator:
f"{r['avg_score']:.2f}"
)
# Top 5 & Bottom 5 indikator (rata-rata norm_score_1_100)
# Performance summary per framework
self.logger.info("\n Performance summary per Framework:")
perf_fw = (
df[df["performance"].notna()]
.groupby(["framework", "performance"])
.size()
.reset_index(name="count")
)
for fw in perf_fw["framework"].unique():
sub = perf_fw[perf_fw["framework"] == fw]
total = sub["count"].sum()
self.logger.info(f" [{fw}]")
for _, r in sub.iterrows():
self.logger.info(
f" {r['performance']:<6}: {int(r['count']):,} "
f"({r['count']/total*100:.1f}%)"
)
# Top 5 & Bottom 5 indikator
ind_avg = (
df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"])
df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"])
["norm_score_1_100"].mean()
.reset_index()
.sort_values("norm_score_1_100", ascending=False)
@@ -625,18 +739,20 @@ class IndicatorNormAggregator:
"\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):"
)
for _, r in ind_avg.head(5).iterrows():
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
unit = f"[{r['unit']}]" if r["unit"] else ""
self.logger.info(
f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} "
f"{r['norm_score_1_100']:.2f} {tag}"
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
)
self.logger.info("\n BOTTOM 5 Indicators:")
for _, r in ind_avg.tail(5).iterrows():
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
unit = f"[{r['unit']}]" if r["unit"] else ""
self.logger.info(
f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} "
f"{r['norm_score_1_100']:.2f} {tag}"
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
)
# Indikator per pillar
@@ -662,14 +778,19 @@ class IndicatorNormAggregator:
self.logger.info("\n" + "=" * 80)
self.logger.info("INDICATOR NORM AGGREGATION")
self.logger.info(" Source : fact_asean_food_security_selected")
self.logger.info(" Dim : dim_indicator (unit)")
self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold")
self.logger.info("=" * 80)
self.load_data()
self.load_units()
self._merge_unit()
self.sdgs_start_year = self._detect_sdgs_start_year()
self._assign_framework()
df_normed = self._compute_norm_values()
df_final = self._compute_scores_and_ranks(df_normed)
df_normed = self._compute_norm_values()
df_yoy = self._compute_yoy_columns(df_normed)
df_scored = self._compute_scores(df_yoy)
df_final = self._assign_performance(df_scored)
rows_loaded = self._save(df_final)
self.pipeline_metadata["rows_loaded"] = rows_loaded
self._log_summary(df_final)
@@ -717,6 +838,7 @@ if __name__ == "__main__":
print("=" * 80)
print("INDICATOR NORM AGGREGATION -> fs_asean_gold")
print(" Source : fact_asean_food_security_selected")
print(" Dim : dim_indicator (unit)")
print(" Output : agg_indicator_norm")
print("=" * 80)