Files
airflow-coolify/scripts/bigquery_aggraget_fact_selected_layer.py
2026-04-07 23:10:34 +07:00

1389 lines
60 KiB
Python

"""
BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION
Tabel 1: agg_indicator_norm -> fs_asean_gold
Tabel 2: agg_narrative_indicator -> fs_asean_gold
=============================================================================
agg_indicator_norm
=============================================================================
Tujuan:
Menghitung norm_value per indikator per negara per tahun, sehingga dapat
melihat performa setiap indikator secara individual (lower_better & higher_better
sudah dibalik).
Framework Classification Logic:
- Semua indikator berlabel "MDGs" secara default.
- Indikator yang ada dalam SDG_ONLY_KEYWORDS akan berlabel "SDGs" mulai dari
sdgs_start_year (tahun pertama FIES hadir, dihitung otomatis).
- Indikator yang SUDAH ADA sebelum sdgs_start_year DAN juga termasuk
SDG_ONLY_KEYWORDS akan memiliki DUA label framework:
* "MDGs" untuk year < sdgs_start_year
* "SDGs" untuk year >= sdgs_start_year
- Indikator yang TIDAK ada dalam SDG_ONLY_KEYWORDS selalu "MDGs".
YoY Logic:
- yoy_value : selisih absolut value vs tahun sebelumnya (per indikator, negara)
- yoy_norm_value : selisih absolut norm_value vs tahun sebelumnya
Performance Label Logic:
- performance : "Good" jika norm_score_1_100 >= 60, "Bad" jika < 60, null jika null
Output Schema (agg_indicator_norm):
year, country_id, country_name,
indicator_id, indicator_name, unit, direction,
pillar_id, pillar_name,
framework, -- "MDGs" | "SDGs"
value, -- raw value asli
norm_value, -- 0-1, direction sudah diperhitungkan
norm_score_1_100, -- scaled 1-100 (global per indikator)
yoy_value, -- perubahan absolut value YoY
yoy_norm_value, -- perubahan absolut norm_value YoY
performance -- "Good" | "Bad" | null
=============================================================================
agg_narrative_indicator
=============================================================================
Tujuan:
Menghasilkan narasi otomatis 1 paragraf per indikator per tahun di level ASEAN
(rata-rata seluruh negara ASEAN), dijalankan otomatis setelah agg_indicator_norm
selesai dalam pipeline yang sama.
Granularity:
year x indicator_id (level ASEAN, bukan per negara)
Output Schema (agg_narrative_indicator):
year, indicator_id, indicator_name, unit, direction,
pillar_name, framework,
avg_value, -- rata-rata value ASEAN
avg_norm_score_1_100, -- rata-rata norm_score_1_100 ASEAN
performance, -- Good | Bad | null
yoy_avg_value, -- perubahan avg_value vs tahun sebelumnya
n_countries, -- jumlah negara yang punya data tahun ini
narrative -- 1 paragraf narasi otomatis
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
import json
from scripts.bigquery_config import get_bigquery_client
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# SDG-ONLY KEYWORD SET
# =============================================================================
SDG_ONLY_KEYWORDS: frozenset = frozenset([
# TARGET 2.1.1 - Undernourishment
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 - Food Insecurity (FIES)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1 - Stunting
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 - Wasting
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
# TARGET 2.2.2 - Overweight (children)
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 - Anaemia
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# Lowercase set untuk matching case-insensitive
_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS)
# FIES-specific keywords untuk deteksi sdgs_start_year
_FIES_DETECTION_KEYWORDS: frozenset = frozenset([
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
])
_FIES_DETECTION_LOWER: frozenset = frozenset(k.lower() for k in _FIES_DETECTION_KEYWORDS)
DIRECTION_INVERT_KEYWORDS = frozenset({
"negative", "lower_better", "lower_is_better", "inverse", "neg",
})
DIRECTION_POSITIVE_KEYWORDS = frozenset({
"positive", "higher_better", "higher_is_better",
})
# Threshold performance label
_PERFORMANCE_THRESHOLD: float = 60.0
# =============================================================================
# PURE HELPERS — agg_indicator_norm
# =============================================================================
def _should_invert(direction: str, logger=None, context: str = "") -> bool:
d = str(direction).lower().strip()
if d in DIRECTION_INVERT_KEYWORDS:
return True
if d in DIRECTION_POSITIVE_KEYWORDS:
return False
if logger:
logger.warning(
f" [DIRECTION WARNING] Unknown direction '{direction}' "
f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)."
)
return False
def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series:
values = series.dropna().values
if len(values) == 0:
return pd.Series(np.nan, index=series.index)
v_min, v_max = values.min(), values.max()
if v_min == v_max:
return pd.Series((lo + hi) / 2.0, index=series.index)
result = np.full(len(series), np.nan)
not_nan = series.notna()
result[not_nan.values] = lo + (series[not_nan].values - v_min) / (v_max - v_min) * (hi - lo)
return pd.Series(result, index=series.index)
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
"""
Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year.
Kolom yang ditambahkan:
yoy_value : value - value_prev
yoy_norm_value : norm_value - norm_value_prev
Baris pertama tiap grup selalu null (tidak ada tahun sebelumnya).
"""
df = df.sort_values("year").copy()
df["value_prev"] = df["value"].shift(1)
df["norm_value_prev"] = df["norm_value"].shift(1)
df["yoy_value"] = np.where(
df["value"].notna() & df["value_prev"].notna(),
df["value"] - df["value_prev"],
np.nan,
)
df["yoy_norm_value"] = np.where(
df["norm_value"].notna() & df["norm_value_prev"].notna(),
df["norm_value"] - df["norm_value_prev"],
np.nan,
)
df = df.drop(columns=["value_prev", "norm_value_prev"])
return df
# =============================================================================
# PURE HELPERS — agg_narrative_indicator
# =============================================================================
def _is_lower_better(direction: str) -> bool:
return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS
def _format_value(value: float, unit: str) -> str:
"""Format nilai dengan unit yang sesuai."""
if pd.isna(value):
return "N/A"
unit = str(unit).strip() if unit else ""
if abs(value) >= 1000:
formatted = f"{value:,.1f}"
elif abs(value) >= 10:
formatted = f"{value:.2f}"
else:
formatted = f"{value:.3f}"
return f"{formatted} {unit}".strip()
def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple:
"""
Kembalikan (direction_word, change_desc, is_positive_trend).
is_positive_trend: True jika perubahan menguntungkan sesuai direction.
"""
unit = str(unit).strip() if unit else ""
abs_yoy = abs(yoy)
if abs_yoy >= 1000:
yoy_str = f"{abs_yoy:,.1f}"
elif abs_yoy >= 10:
yoy_str = f"{abs_yoy:.2f}"
else:
yoy_str = f"{abs_yoy:.3f}"
change_desc = f"{yoy_str} {unit}".strip()
is_positive = (yoy < 0) if lower_better else (yoy > 0)
direction_word = "decreased by" if yoy < 0 else "increased by"
return direction_word, change_desc, is_positive
# =============================================================================
# PURE HELPER — narrative builder (per indicator, all years, all countries)
# ======================================================================
def _build_narrative_per_indicator(row: pd.Series) -> str:
"""
Bangun 1 paragraf narasi ASEAN-level untuk satu indikator,
merangkum seluruh periode (year_min - year_max) dan seluruh negara.
Kolom yang dibutuhkan dari row:
indicator_name, unit, direction, pillar_name, framework,
year_min, year_max, n_countries,
avg_value_first, avg_value_last,
avg_norm_score_1_100, -- rata-rata seluruh periode
performance, -- Good | Bad | null
n_yoy_total, -- total transisi year-on-year
n_yoy_positive, -- jumlah transisi yang membaik
best_yoy_from, best_yoy_to, -- periode dengan perbaikan terbesar
country_worst, country_best -- negara dengan nilai terburuk / terbaik
"""
ind_name = str(row["indicator_name"]).strip()
unit = str(row["unit"]).strip() if row["unit"] else ""
direction = str(row["direction"]).strip()
pillar = str(row["pillar_name"]).strip()
framework = str(row["framework"]).strip()
year_min = int(row["year_min"])
year_max = int(row["year_max"])
n_countries = int(row["n_countries"])
avg_score = row["avg_norm_score_1_100"]
performance = row["performance"]
avg_first = row["avg_value_first"]
avg_last = row["avg_value_last"]
n_yoy_total = int(row["n_yoy_total"]) if not pd.isna(row["n_yoy_total"]) else 0
n_yoy_positive = int(row["n_yoy_positive"]) if not pd.isna(row["n_yoy_positive"]) else 0
best_yoy_from = row["best_yoy_from"]
best_yoy_to = row["best_yoy_to"]
country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None
country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None
lower_better = _is_lower_better(direction)
direction_label = (
"lower values indicate better outcomes"
if lower_better
else "higher values indicate better outcomes"
)
# ---- Kalimat 1: Identifikasi indikator + cakupan -------------------------
member_str = f"{n_countries} member state{'s' if n_countries > 1 else ''}"
sentence1 = (
f"Across ASEAN, {ind_name} under the {framework} framework "
f"({pillar} pillar) was monitored from {year_min} to {year_max} "
f"across {member_str}."
)
# ---- Kalimat 2: Tren keseluruhan (first → last) --------------------------
if not pd.isna(avg_first) and not pd.isna(avg_last):
diff = avg_last - avg_first
abs_diff = abs(diff)
# Format nilai
def fmt(v):
if abs(v) >= 1000:
return f"{v:,.1f}"
elif abs(v) >= 10:
return f"{v:.2f}"
else:
return f"{v:.3f}"
first_str = f"{fmt(avg_first)}{' ' + unit if unit else ''}"
last_str = f"{fmt(avg_last)}{' ' + unit if unit else ''}"
diff_str = f"{fmt(abs_diff)}{' ' + unit if unit else ''}"
# Apakah tren menguntungkan?
is_improving = (diff < 0) if lower_better else (diff > 0)
trend_word = "improving" if is_improving else "deteriorating"
verb = "declining" if diff < 0 else "rising"
sentence2 = (
f"Since {direction_label}, the region collectively showed "
f"{'an' if trend_word[0] in 'aeiou' else 'a'} {trend_word} trend, "
f"with the ASEAN average {verb} from {first_str} in {year_min} "
f"to {last_str} in {year_max} "
f"(a cumulative {'reduction' if diff < 0 else 'increase'} of {diff_str})."
)
else:
sentence2 = (
f"Since {direction_label}, trend analysis could not be performed "
f"due to missing data at the start or end of the period."
)
# ---- Kalimat 3: Score + performance -------------------------------------
if not pd.isna(avg_score):
score_str = f"{avg_score:.1f} out of 100"
if performance == "Good":
sentence3 = (
f"The regional normalized score averaged {score_str} "
f"classified as Good performance."
)
elif performance == "Bad":
sentence3 = (
f"The regional normalized score averaged {score_str} "
f"classified as Bad performance, falling below the 60-point threshold."
)
else:
sentence3 = (
f"The regional normalized score averaged {score_str}."
)
else:
sentence3 = "The regional normalized performance score could not be assessed."
# ---- Kalimat 4: Negara terbaik & terburuk --------------------------------
if country_worst and country_best and country_worst != country_best:
if lower_better:
worst_label = "highest (most concerning)"
best_label = "consistently performed best (lowest values)"
else:
worst_label = "lowest (most concerning)"
best_label = "consistently performed best (highest values)"
sentence4 = (
f"Among member states, {country_worst} recorded the {worst_label} "
f"levels throughout the period, while {country_best} {best_label}."
)
elif country_best:
sentence4 = (
f"Among member states, {country_best} consistently recorded the "
f"best performance throughout the period."
)
else:
sentence4 = ""
# ---- Kalimat 5: YoY transitions -----------------------------------------
if n_yoy_total > 0:
yoy_sentence = (
f"Year-on-year, the region improved in {n_yoy_positive} out of "
f"{n_yoy_total} transition{'s' if n_yoy_total > 1 else ''}"
)
if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to):
yoy_sentence += (
f", with the largest regional gain occurring between "
f"{int(best_yoy_from)} and {int(best_yoy_to)}."
)
else:
yoy_sentence += "."
else:
yoy_sentence = "Insufficient data to assess year-on-year transitions."
parts = [sentence1, sentence2, sentence3]
if sentence4:
parts.append(sentence4)
parts.append(yoy_sentence)
return " ".join(parts)
# =============================================================================
# MAIN CLASS
# =============================================================================
class IndicatorNormAggregator:
"""
Hitung norm_value per indikator untuk seluruh data di
fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm.
Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator.
Alur agg_indicator_norm:
1. Load fact_asean_food_security_selected
2. Load dim_indicator -> ambil kolom unit
3. Merge unit ke df
4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data)
5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label
6. Hitung norm_value per indikator (direction-aware, 0-1)
7. Hitung YoY per (indicator_id, country_id)
8. Scale ke 1-100 per indikator (global)
9. Assign performance label (Good/Bad)
10. Simpan ke BigQuery -> agg_indicator_norm
11. Summary log agg_indicator_norm
Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada):
12. Agregasi ke level ASEAN (year x indicator_id)
13. Hitung YoY avg_value per indikator
14. Assign performance berdasarkan avg_norm_score
15. Build narrative 1 paragraf per baris
16. Simpan ke BigQuery -> agg_narrative_indicator
17. Summary log agg_narrative_indicator
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.df = None
self.df_unit = None
self.sdgs_start_year = None
self.pipeline_start = None
self.pipeline_metadata = {
"rows_fetched" : 0,
"rows_loaded" : 0,
"rows_loaded_narrative" : 0,
"start_time" : None,
"end_time" : None,
}
# =========================================================================
# STEP 1: Load fact table
# =========================================================================
def load_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOAD DATA — fact_asean_food_security_selected")
self.logger.info("=" * 80)
self.df = read_from_bigquery(
self.client, "fact_asean_food_security_selected", layer="gold"
)
required = {
"country_id", "country_name",
"indicator_id", "indicator_name", "direction",
"pillar_id", "pillar_name",
"year", "value",
}
missing = required - set(self.df.columns)
if missing:
raise ValueError(f"Kolom tidak ditemukan: {missing}")
n_null = self.df["direction"].isna().sum()
if n_null > 0:
self.logger.warning(f" {n_null} rows direction NULL -> diisi 'positive'")
self.df["direction"] = self.df["direction"].fillna("positive")
self.pipeline_metadata["rows_fetched"] = len(self.df)
self.logger.info(f" Rows : {len(self.df):,}")
self.logger.info(f" Countries : {self.df['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}")
self.logger.info(
f" Years : {int(self.df['year'].min())} - {int(self.df['year'].max())}"
)
# =========================================================================
# STEP 2: Load unit dari dim_indicator
# =========================================================================
def load_units(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: LOAD UNIT — dim_indicator")
self.logger.info("=" * 80)
dim = read_from_bigquery(self.client, "dim_indicator", layer="gold")
if "indicator_id" not in dim.columns or "unit" not in dim.columns:
raise ValueError(
f"dim_indicator harus punya kolom 'indicator_id' dan 'unit'. "
f"Kolom tersedia: {list(dim.columns)}"
)
self.df_unit = (
dim[["indicator_id", "unit"]]
.drop_duplicates(subset=["indicator_id"])
.copy()
)
self.df_unit["indicator_id"] = self.df_unit["indicator_id"].astype(int)
self.df_unit["unit"] = self.df_unit["unit"].fillna("").astype(str)
n_missing_unit = self.df_unit["unit"].eq("").sum()
self.logger.info(f" dim_indicator rows (unique indicator_id): {len(self.df_unit):,}")
self.logger.info(f" Indicator dengan unit kosong : {n_missing_unit}")
fact_ids = set(self.df["indicator_id"].astype(int).unique())
dim_ids = set(self.df_unit["indicator_id"].unique())
orphan = fact_ids - dim_ids
if orphan:
self.logger.warning(
f" [WARNING] {len(orphan)} indicator_id di fact tidak ditemukan di "
f"dim_indicator (unit akan diisi ''): {sorted(orphan)}"
)
# =========================================================================
# STEP 3: Merge unit ke df
# =========================================================================
def _merge_unit(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: MERGE UNIT -> fact df")
self.logger.info("=" * 80)
before = len(self.df)
self.df = self.df.merge(self.df_unit, on="indicator_id", how="left")
self.df["unit"] = self.df["unit"].fillna("").astype(str)
after = len(self.df)
assert before == after, (
f"Row count berubah setelah merge unit: {before} -> {after}"
)
n_empty = self.df["unit"].eq("").sum()
self.logger.info(
f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}"
)
unique_units = self.df["unit"].value_counts().to_dict()
self.logger.info(" Distribusi unit (top 10):")
for u, cnt in list(unique_units.items())[:10]:
label = repr(u) if u == "" else u
self.logger.info(f" {label:<30}: {cnt:,} rows")
# =========================================================================
# STEP 4: Deteksi sdgs_start_year
# =========================================================================
def _detect_sdgs_start_year(self) -> int:
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: DETECT sdgs_start_year (first FIES year)")
self.logger.info("=" * 80)
fies_rows = self.df[
self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER)
]
if not fies_rows.empty:
sdgs_start = int(fies_rows["year"].min())
n_fies_ind = fies_rows["indicator_name"].nunique()
self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}")
self.logger.info(f" FIES indicators found: {n_fies_ind}, first year = {sdgs_start}")
for nm in fies_rows["indicator_name"].unique():
min_y = int(fies_rows[fies_rows["indicator_name"] == nm]["year"].min())
self.logger.info(f" - {nm[:60]} (first year: {min_y})")
return sdgs_start
self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar")
ind_min_year = (
self.df.groupby("indicator_id")["year"]
.min().reset_index()
.rename(columns={"year": "min_year"})
)
unique_years = sorted(ind_min_year["min_year"].unique())
self.logger.info(f" Unique min_year per indikator: {unique_years}")
if len(unique_years) == 1:
sdgs_start = int(unique_years[0]) + 9999
self.logger.info(" Hanya 1 cluster -> semua MDGs")
else:
gaps = [
(unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1])
for i in range(len(unique_years) - 1)
]
gaps.sort(reverse=True)
_, y_before, y_after = gaps[0]
sdgs_start = int(y_after)
self.logger.info(
f" Gap terbesar: {y_before} -> {y_after} -> sdgs_start_year = {sdgs_start}"
)
return sdgs_start
# =========================================================================
# STEP 5: Assign framework
# =========================================================================
def _assign_framework(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: ASSIGN FRAMEWORK PER BARIS")
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
self.logger.info("=" * 80)
df = self.df.copy()
df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER)
df["framework"] = "MDGs"
mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year)
df.loc[mask_sdgs, "framework"] = "SDGs"
df = df.drop(columns=["_is_sdg_kw"])
fw_dist = df["framework"].value_counts()
self.logger.info("\n Framework distribution (rows):")
for fw, cnt in fw_dist.items():
self.logger.info(f" {fw:<6}: {cnt:,} rows")
dual = (
df.groupby("indicator_id")["framework"]
.nunique()
.reset_index()
.rename(columns={"framework": "n_frameworks"})
)
dual_ids = dual[dual["n_frameworks"] > 1]["indicator_id"].tolist()
self.logger.info(
f"\n Indikator dengan DUAL framework (MDGs + SDGs): {len(dual_ids)}"
)
if dual_ids:
for iid in dual_ids:
ind_name = df[df["indicator_id"] == iid]["indicator_name"].iloc[0]
yr_range = df[df["indicator_id"] == iid][["year", "framework"]].drop_duplicates()
mdgs_yrs = sorted(yr_range[yr_range["framework"] == "MDGs"]["year"].tolist())
sdgs_yrs = sorted(yr_range[yr_range["framework"] == "SDGs"]["year"].tolist())
self.logger.info(
f" [{iid}] {ind_name[:55]}\n"
f" MDGs years: {mdgs_yrs}\n"
f" SDGs years: {sdgs_yrs}"
)
self.df = df
# =========================================================================
# STEP 6: Hitung norm_value per indikator (direction-aware)
# =========================================================================
def _compute_norm_values(self) -> pd.DataFrame:
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)")
self.logger.info("=" * 80)
df = self.df.copy()
norm_parts = []
for ind_id, grp in df.groupby("indicator_id"):
grp = grp.copy()
direction = str(grp["direction"].iloc[0])
do_invert = _should_invert(
direction, self.logger, context=f"indicator_id={ind_id}"
)
valid_mask = grp["value"].notna()
n_valid = valid_mask.sum()
if n_valid < 2:
grp["norm_value"] = np.nan
norm_parts.append(grp)
self.logger.warning(
f" [SKIP] indicator_id={ind_id}: only {n_valid} valid values"
)
continue
raw = grp.loc[valid_mask, "value"].values
v_min = raw.min()
v_max = raw.max()
normed = np.full(len(grp), np.nan)
if v_min == v_max:
normed[valid_mask.values] = 0.5
else:
normed[valid_mask.values] = (raw - v_min) / (v_max - v_min)
if do_invert:
normed = np.where(np.isnan(normed), np.nan, 1.0 - normed)
grp["norm_value"] = normed
norm_parts.append(grp)
df_normed = pd.concat(norm_parts, ignore_index=True)
self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators")
self.logger.info(
f" norm_value range : "
f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}"
)
self.logger.info(f" norm_value nulls : {df_normed['norm_value'].isna().sum()}")
return df_normed
# =========================================================================
# STEP 7: Hitung YoY per (indicator_id, country_id)
# =========================================================================
def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame:
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: COMPUTE YoY COLUMNS (per indicator, per country)")
self.logger.info("=" * 80)
parts = []
groups = df.groupby(["indicator_id", "country_id"], sort=False)
self.logger.info(f" Processing {groups.ngroups:,} (indicator x country) groups...")
for (ind_id, country_id), grp in groups:
parts.append(_compute_yoy(grp))
df_out = pd.concat(parts, ignore_index=True)
self.logger.info(
f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}"
)
self.logger.info(
f" yoy_value range : "
f"{df_out['yoy_value'].min():.4f} - {df_out['yoy_value'].max():.4f}"
)
self.logger.info(
f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}"
)
self.logger.info(
f" yoy_norm_value range: "
f"{df_out['yoy_norm_value'].min():.4f} - {df_out['yoy_norm_value'].max():.4f}"
)
return df_out
# =========================================================================
# STEP 8: Scale ke 1-100
# =========================================================================
def _compute_scores(self, df: pd.DataFrame) -> pd.DataFrame:
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: SCALE TO 1-100")
self.logger.info("=" * 80)
score_parts = []
for ind_id, grp in df.groupby("indicator_id"):
grp = grp.copy()
grp["norm_score_1_100"] = global_minmax(grp["norm_value"])
score_parts.append(grp)
df = pd.concat(score_parts, ignore_index=True)
self.logger.info(
f" norm_score_1_100 range: "
f"{df['norm_score_1_100'].min():.2f} - {df['norm_score_1_100'].max():.2f}"
)
return df
# =========================================================================
# STEP 9: Assign performance label
# =========================================================================
def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame:
"""
performance = "Good" jika norm_score_1_100 >= 60
= "Bad" jika norm_score_1_100 < 60
= null jika norm_score_1_100 null
"""
self.logger.info("\n" + "=" * 80)
self.logger.info(
f"STEP 9: ASSIGN PERFORMANCE LABEL "
f"(threshold >= {_PERFORMANCE_THRESHOLD} -> Good)"
)
self.logger.info("=" * 80)
df = df.copy()
df["performance"] = pd.NA
has_score = df["norm_score_1_100"].notna()
df.loc[has_score & (df["norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
df.loc[has_score & (df["norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
n_good = (df["performance"] == "Good").sum()
n_bad = (df["performance"] == "Bad").sum()
n_null = df["performance"].isna().sum()
total = len(df)
self.logger.info(f" Good : {n_good:,} ({n_good/total*100:.1f}%)")
self.logger.info(f" Bad : {n_bad:,} ({n_bad/total*100:.1f}%)")
self.logger.info(f" Null : {n_null:,} ({n_null/total*100:.1f}%)")
return df
# =========================================================================
# STEP 10: Save agg_indicator_norm to BigQuery
# =========================================================================
def _save(self, df: pd.DataFrame) -> int:
table_name = "agg_indicator_norm"
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 10: SAVE -> [Gold] {table_name}")
self.logger.info("=" * 80)
out = df[[
"year",
"country_id",
"country_name",
"indicator_id",
"indicator_name",
"unit",
"direction",
"pillar_id",
"pillar_name",
"framework",
"value",
"norm_value",
"norm_score_1_100",
"yoy_value",
"yoy_norm_value",
"performance",
]].copy()
out = out.sort_values(
["year", "country_name", "pillar_name", "indicator_name"]
).reset_index(drop=True)
# Cast
out["year"] = out["year"].astype(int)
out["country_id"] = out["country_id"].astype(int)
out["country_name"] = out["country_name"].astype(str)
out["indicator_id"] = out["indicator_id"].astype(int)
out["indicator_name"] = out["indicator_name"].astype(str)
out["unit"] = out["unit"].astype(str)
out["direction"] = out["direction"].astype(str)
out["pillar_id"] = out["pillar_id"].astype(int)
out["pillar_name"] = out["pillar_name"].astype(str)
out["framework"] = out["framework"].astype(str)
out["value"] = out["value"].astype(float)
out["norm_value"] = out["norm_value"].astype(float)
out["norm_score_1_100"] = out["norm_score_1_100"].astype(float)
out["yoy_value"] = pd.to_numeric(out["yoy_value"], errors="coerce").astype(float)
out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float)
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
self.logger.info(f" Columns : {list(out.columns)}")
self.logger.info(f" Total rows : {len(out):,}")
self.logger.info(f" Countries : {out['country_id'].nunique()}")
self.logger.info(f" Indicators : {out['indicator_id'].nunique()}")
self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}")
self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}")
self.logger.info(f" Performance: {dict(out['performance'].value_counts())}")
schema = [
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_norm_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, out, table_name,
layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema,
)
log_update(self.client, "DW", table_name, "full_load", rows_loaded)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
metadata = {
"source_class" : self.__class__.__name__,
"table_name" : table_name,
"execution_timestamp": self.pipeline_start,
"duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(),
"rows_fetched" : self.pipeline_metadata["rows_fetched"],
"rows_transformed" : rows_loaded,
"rows_loaded" : rows_loaded,
"completeness_pct" : 100.0,
"config_snapshot" : json.dumps({
"sdgs_start_year" : self.sdgs_start_year,
"sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS),
"layer" : "gold",
"normalization" : "per_indicator_global_minmax",
"direction_handling" : "lower_better_inverted",
"yoy_columns" : ["yoy_value", "yoy_norm_value"],
"performance_threshold": _PERFORMANCE_THRESHOLD,
"unit_source" : "dim_indicator",
"framework_logic" : (
"SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, "
"SDGs if year >= sdgs_start_year. "
"Non-SDG_ONLY: always MDGs."
),
}),
"validation_metrics" : json.dumps({
"total_rows" : rows_loaded,
"n_indicators" : int(out["indicator_id"].nunique()),
"n_countries" : int(out["country_id"].nunique()),
"sdgs_start_year": self.sdgs_start_year,
}),
}
save_etl_metadata(self.client, metadata)
self.logger.info(" Metadata -> [AUDIT] etl_metadata")
return rows_loaded
# =========================================================================
# STEP 11: Summary log agg_indicator_norm
# =========================================================================
def _log_summary(self, df: pd.DataFrame):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 11: SUMMARY — agg_indicator_norm")
self.logger.info("=" * 80)
summary = (
df.groupby(["framework", "year"])
.agg(
n_indicators=("indicator_id", "nunique"),
n_countries =("country_id", "nunique"),
avg_score =("norm_score_1_100", "mean"),
)
.reset_index()
)
self.logger.info(
f"\n{'Framework':<8} {'Year':<6} {'Indicators':<12} {'Countries':<12} {'Avg Score'}"
)
self.logger.info("-" * 55)
for _, r in summary.iterrows():
self.logger.info(
f"{r['framework']:<8} {int(r['year']):<6} "
f"{int(r['n_indicators']):<12} {int(r['n_countries']):<12} "
f"{r['avg_score']:.2f}"
)
# Performance summary per framework
self.logger.info("\n Performance summary per Framework:")
perf_fw = (
df[df["performance"].notna()]
.groupby(["framework", "performance"])
.size()
.reset_index(name="count")
)
for fw in perf_fw["framework"].unique():
sub = perf_fw[perf_fw["framework"] == fw]
total = sub["count"].sum()
self.logger.info(f" [{fw}]")
for _, r in sub.iterrows():
self.logger.info(
f" {r['performance']:<6}: {int(r['count']):,} "
f"({r['count']/total*100:.1f}%)"
)
# Top 5 & Bottom 5 indikator
ind_avg = (
df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"])
["norm_score_1_100"].mean()
.reset_index()
.sort_values("norm_score_1_100", ascending=False)
)
self.logger.info(
"\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):"
)
for _, r in ind_avg.head(5).iterrows():
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
unit = f"[{r['unit']}]" if r["unit"] else ""
self.logger.info(
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
)
self.logger.info("\n BOTTOM 5 Indicators:")
for _, r in ind_avg.tail(5).iterrows():
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
unit = f"[{r['unit']}]" if r["unit"] else ""
self.logger.info(
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
)
# Indikator per pillar
pillar_summary = (
df.drop_duplicates(subset=["indicator_id", "pillar_name"])
.groupby("pillar_name")["indicator_id"]
.count()
.reset_index()
.rename(columns={"indicator_id": "n_indicators"})
)
self.logger.info("\n Indicators per pillar:")
for _, r in pillar_summary.iterrows():
self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}")
# =========================================================================
# STEP 12-16: agg_narrative_indicator (lanjutan dari df_final)
# =========================================================================
def _build_narrative_table(self, df_final: pd.DataFrame):
"""
Pipeline agg_narrative_indicator — granularity: per indicator_id (1 baris per indikator).
Narasi merangkum seluruh periode + seluruh negara ASEAN.
Dijalankan otomatis setelah agg_indicator_norm selesai.
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 12-16: agg_narrative_indicator")
self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)")
self.logger.info("=" * 80)
# -- STEP 12: Hitung statistik agregat per (indicator_id, country_id, year) --
self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---")
df = df_final.copy()
# Dimensi tetap per indikator
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
# ---- 12a. ASEAN avg per (indicator_id, year) -> untuk first/last & YoY ---
df_yr = (
df.groupby(["indicator_id", "year"])
.agg(
avg_value =("value", "mean"),
avg_norm_score =("norm_score_1_100", "mean"),
n_countries_year =("country_id", "nunique"),
)
.reset_index()
)
# ---- 12b. first year / last year avg value per indikator -----------------
df_first = (
df_yr.sort_values("year")
.groupby("indicator_id")
.first()
.reset_index()[["indicator_id", "year", "avg_value"]]
.rename(columns={"year": "year_min", "avg_value": "avg_value_first"})
)
df_last = (
df_yr.sort_values("year")
.groupby("indicator_id")
.last()
.reset_index()[["indicator_id", "year", "avg_value"]]
.rename(columns={"year": "year_max", "avg_value": "avg_value_last"})
)
# ---- 12c. Rata-rata norm_score seluruh periode ----------------------------
df_score_avg = (
df_yr.groupby("indicator_id")
.agg(avg_norm_score_1_100=("avg_norm_score", "mean"))
.reset_index()
)
# ---- 12d. n_countries: maks negara yang pernah hadir ---------------------
df_nc = (
df.groupby("indicator_id")["country_id"]
.nunique()
.reset_index()
.rename(columns={"country_id": "n_countries"})
)
# ---- 12e. YoY per (indicator_id) di level ASEAN avg ----------------------
self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---")
yoy_parts = []
for ind_id, grp in df_yr.groupby("indicator_id"):
grp = grp.sort_values("year").copy()
grp["prev_avg"] = grp["avg_value"].shift(1)
grp["yoy"] = np.where(
grp["avg_value"].notna() & grp["prev_avg"].notna(),
grp["avg_value"] - grp["prev_avg"],
np.nan,
)
grp = grp.drop(columns=["prev_avg"])
yoy_parts.append(grp)
df_yr = pd.concat(yoy_parts, ignore_index=True)
# Ambil direction per indikator untuk tentukan "improving"
dir_map = (
df[["indicator_id", "direction"]]
.drop_duplicates(subset=["indicator_id"])
.set_index("indicator_id")["direction"]
.to_dict()
)
def _is_positive_yoy(ind_id, yoy_val):
"""True jika perubahan yoy menguntungkan sesuai direction."""
if pd.isna(yoy_val):
return False
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
return (yoy_val < 0) if lb else (yoy_val > 0)
# Hitung n_yoy_total, n_yoy_positive, best_yoy
yoy_stats = []
for ind_id, grp in df_yr.groupby("indicator_id"):
grp_yoy = grp[grp["yoy"].notna()].copy()
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
n_total = len(grp_yoy)
n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"]))
# "Best" = perubahan paling menguntungkan
if n_total > 0:
if lb:
idx_best = grp_yoy["yoy"].idxmin() # paling negatif = paling baik
else:
idx_best = grp_yoy["yoy"].idxmax() # paling positif = paling baik
best_row = grp_yoy.loc[idx_best]
best_yoy_from = best_row["year"] - 1
best_yoy_to = best_row["year"]
else:
best_yoy_from = np.nan
best_yoy_to = np.nan
yoy_stats.append({
"indicator_id" : ind_id,
"n_yoy_total" : n_total,
"n_yoy_positive": n_positive,
"best_yoy_from" : best_yoy_from,
"best_yoy_to" : best_yoy_to,
})
df_yoy_stats = pd.DataFrame(yoy_stats)
# ---- 12f. Country terbaik & terburuk (rata-rata value seluruh periode) ---
df_country_avg = (
df.groupby(["indicator_id", "country_id", "country_name"])
.agg(country_avg_value=("value", "mean"))
.reset_index()
)
country_stats = []
for ind_id, grp in df_country_avg.groupby("indicator_id"):
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
if lb:
worst_row = grp.loc[grp["country_avg_value"].idxmax()]
best_row = grp.loc[grp["country_avg_value"].idxmin()]
else:
worst_row = grp.loc[grp["country_avg_value"].idxmin()]
best_row = grp.loc[grp["country_avg_value"].idxmax()]
country_stats.append({
"indicator_id": ind_id,
"country_worst": worst_row["country_name"],
"country_best" : best_row["country_name"],
})
df_country_stats = pd.DataFrame(country_stats)
# ---- 12g. Dimensi tetap per indikator ------------------------------------
df_dim = (
df[["indicator_id"] + dim_cols]
.drop_duplicates(subset=["indicator_id"])
)
# ---- 12h. Merge semua -------------------------------------------------------
df_agg = (
df_dim
.merge(df_first, on="indicator_id", how="left")
.merge(df_last, on="indicator_id", how="left")
.merge(df_score_avg, on="indicator_id", how="left")
.merge(df_nc, on="indicator_id", how="left")
.merge(df_yoy_stats, on="indicator_id", how="left")
.merge(df_country_stats,on="indicator_id", how="left")
)
self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}")
self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}")
# -- STEP 14: Assign performance --------------------------------------------
self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---")
df_agg["performance"] = pd.NA
has_score = df_agg["avg_norm_score_1_100"].notna()
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
n_good = (df_agg["performance"] == "Good").sum()
n_bad = (df_agg["performance"] == "Bad").sum()
self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}")
# -- STEP 15: Build narrative -----------------------------------------------
self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---")
df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1)
self.logger.info(f" Narratives generated: {len(df_agg):,}")
self.logger.info("\n Sample (first 2):")
for _, row in df_agg.head(2).iterrows():
self.logger.info(
f"\n [{int(row['indicator_id'])}] {row['indicator_name'][:60]}"
f"\n -> {row['narrative'][:300]}..."
)
# -- STEP 16: Save ----------------------------------------------------------
self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---")
out = df_agg[[
"indicator_id", "indicator_name", "unit", "direction",
"pillar_name", "framework",
"year_min", "year_max", "n_countries",
"avg_value_first", "avg_value_last",
"avg_norm_score_1_100", "performance",
"n_yoy_total", "n_yoy_positive",
"best_yoy_from", "best_yoy_to",
"country_worst", "country_best",
"narrative",
]].copy()
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
# Cast
out["indicator_id"] = out["indicator_id"].astype(int)
out["indicator_name"] = out["indicator_name"].astype(str)
out["unit"] = out["unit"].fillna("").astype(str)
out["direction"] = out["direction"].astype(str)
out["pillar_name"] = out["pillar_name"].astype(str)
out["framework"] = out["framework"].astype(str)
out["year_min"] = out["year_min"].astype(int)
out["year_max"] = out["year_max"].astype(int)
out["n_countries"] = out["n_countries"].astype(int)
out["avg_value_first"] = pd.to_numeric(out["avg_value_first"], errors="coerce").astype(float)
out["avg_value_last"] = pd.to_numeric(out["avg_value_last"], errors="coerce").astype(float)
out["avg_norm_score_1_100"] = pd.to_numeric(out["avg_norm_score_1_100"], errors="coerce").astype(float)
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
out["n_yoy_total"] = pd.to_numeric(out["n_yoy_total"], errors="coerce").astype("Int64")
out["n_yoy_positive"] = pd.to_numeric(out["n_yoy_positive"], errors="coerce").astype("Int64")
out["best_yoy_from"] = pd.to_numeric(out["best_yoy_from"], errors="coerce").astype("Int64")
out["best_yoy_to"] = pd.to_numeric(out["best_yoy_to"], errors="coerce").astype("Int64")
out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string")
out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string")
out["narrative"] = out["narrative"].astype(str)
schema = [
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("year_min", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year_max", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("avg_value_first", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("avg_value_last", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
bigquery.SchemaField("n_yoy_total", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("n_yoy_positive", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("best_yoy_from", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("best_yoy_to", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("country_worst", "STRING", mode="NULLABLE"),
bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"),
bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, out, "agg_narrative_indicator",
layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema,
)
log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded)
self.logger.info(
f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold"
)
metadata = {
"source_class" : self.__class__.__name__,
"table_name" : "agg_narrative_indicator",
"execution_timestamp": self.pipeline_start,
"duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(),
"rows_fetched" : self.pipeline_metadata["rows_fetched"],
"rows_transformed" : rows_loaded,
"rows_loaded" : rows_loaded,
"completeness_pct" : 100.0,
"config_snapshot" : json.dumps({
"source_table" : "agg_indicator_norm (in-memory df_final)",
"granularity" : "indicator_id only (all years, all ASEAN countries)",
"aggregation" : "full-period summary per indicator",
"performance_threshold": _PERFORMANCE_THRESHOLD,
"layer" : "gold",
}),
"validation_metrics" : json.dumps({
"total_rows" : rows_loaded,
"n_indicators": int(out["indicator_id"].nunique()),
}),
}
save_etl_metadata(self.client, metadata)
self.logger.info(" Metadata -> [AUDIT] etl_metadata")
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
# =========================================================================
# RUN
# =========================================================================
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata["start_time"] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("INDICATOR NORM AGGREGATION")
self.logger.info(" Source : fact_asean_food_security_selected")
self.logger.info(" Dim : dim_indicator (unit)")
self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold")
self.logger.info(" agg_narrative_indicator -> fs_asean_gold")
self.logger.info("=" * 80)
self.load_data()
self.load_units()
self._merge_unit()
self.sdgs_start_year = self._detect_sdgs_start_year()
self._assign_framework()
df_normed = self._compute_norm_values()
df_yoy = self._compute_yoy_columns(df_normed)
df_scored = self._compute_scores(df_yoy)
df_final = self._assign_performance(df_scored)
rows_loaded = self._save(df_final)
self.pipeline_metadata["rows_loaded"] = rows_loaded
self._log_summary(df_final)
# Lanjut build agg_narrative_indicator dari df_final (tanpa re-load BQ)
self._build_narrative_table(df_final)
self.pipeline_metadata["end_time"] = datetime.now()
duration = (
self.pipeline_metadata["end_time"] - self.pipeline_start
).total_seconds()
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}")
self.logger.info(f" Rows Loaded (norm) : {rows_loaded:,}")
self.logger.info(f" Rows Loaded (narrative) : {self.pipeline_metadata['rows_loaded_narrative']:,}")
self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}")
# =============================================================================
# AIRFLOW TASK <-- tidak berubah
# =============================================================================
def run_indicator_norm_aggregation():
"""
Airflow task: Build agg_indicator_norm + agg_narrative_indicator.
Dipanggil setelah analytical_layer_to_gold selesai.
"""
client = get_bigquery_client()
agg = IndicatorNormAggregator(client)
agg.run()
print(f"agg_indicator_norm loaded : {agg.pipeline_metadata['rows_loaded']:,} rows")
print(f"agg_narrative_indicator loaded: {agg.pipeline_metadata['rows_loaded_narrative']:,} rows")
# =============================================================================
# MAIN
# =============================================================================
if __name__ == "__main__":
import sys, io
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
print("=" * 80)
print("INDICATOR NORM AGGREGATION -> fs_asean_gold")
print(" Source : fact_asean_food_security_selected")
print(" Dim : dim_indicator (unit)")
print(" Output : agg_indicator_norm")
print(" agg_narrative_indicator")
print("=" * 80)
logger = setup_logging()
client = get_bigquery_client()
agg = IndicatorNormAggregator(client)
agg.run()
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print("=" * 80)