1402 lines
63 KiB
Python
1402 lines
63 KiB
Python
"""
|
|
BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION
|
|
Tabel 1: agg_indicator_norm -> fs_asean_gold
|
|
Tabel 2: agg_narrative_indicator -> fs_asean_gold
|
|
|
|
=============================================================================
|
|
PERUBAHAN:
|
|
- Ditambahkan kolom indicator_name_id : nama indikator dalam Bahasa Indonesia
|
|
- Ditambahkan kolom pillar_name_id : nama pilar dalam Bahasa Indonesia
|
|
- Kedua kolom ikut tersimpan di BigQuery (schema + DataFrame output)
|
|
=============================================================================
|
|
|
|
agg_indicator_norm
|
|
=============================================================================
|
|
Tujuan:
|
|
Menghitung norm_value per indikator per negara per tahun, sehingga dapat
|
|
melihat performa setiap indikator secara individual (lower_better & higher_better
|
|
sudah dibalik).
|
|
|
|
Framework Classification Logic:
|
|
- Semua indikator berlabel "MDGs" secara default.
|
|
- Indikator yang ada dalam SDG_ONLY_KEYWORDS akan berlabel "SDGs" mulai dari
|
|
sdgs_start_year (tahun pertama FIES hadir, dihitung otomatis).
|
|
- Indikator yang SUDAH ADA sebelum sdgs_start_year DAN juga termasuk
|
|
SDG_ONLY_KEYWORDS akan memiliki DUA label framework:
|
|
* "MDGs" untuk year < sdgs_start_year
|
|
* "SDGs" untuk year >= sdgs_start_year
|
|
- Indikator yang TIDAK ada dalam SDG_ONLY_KEYWORDS selalu "MDGs".
|
|
|
|
YoY Logic:
|
|
- yoy_value : selisih absolut value vs tahun sebelumnya (per indikator, negara)
|
|
- yoy_norm_value : selisih absolut norm_value vs tahun sebelumnya
|
|
|
|
Performance Label Logic:
|
|
- performance : "Good" jika norm_score_1_100 >= 60, "Bad" jika < 60, null jika null
|
|
|
|
Output Schema (agg_indicator_norm):
|
|
year, country_id, country_name,
|
|
indicator_id, indicator_name, indicator_name_id,
|
|
unit, direction,
|
|
pillar_id, pillar_name, pillar_name_id,
|
|
framework,
|
|
value,
|
|
norm_value,
|
|
norm_score_1_100,
|
|
yoy_value,
|
|
yoy_norm_value,
|
|
performance
|
|
|
|
=============================================================================
|
|
agg_narrative_indicator
|
|
=============================================================================
|
|
Tujuan:
|
|
Menghasilkan narasi otomatis per indikator (granularity: indicator_id).
|
|
Narasi membaca kondisi nyata dari data: tren, gap, anomali, konsistensi.
|
|
Tersedia dalam dua bahasa: Inggris (narrative_en) dan Indonesia (narrative_id).
|
|
Tanpa markdown bold (**) agar aman ditampilkan di Looker Studio.
|
|
|
|
Granularity:
|
|
indicator_id (all years, all ASEAN countries)
|
|
|
|
Output Schema (agg_narrative_indicator):
|
|
indicator_id, indicator_name, indicator_name_id,
|
|
unit, direction,
|
|
pillar_name, pillar_name_id,
|
|
framework,
|
|
year_min, year_max, n_countries,
|
|
avg_value_first, avg_value_last,
|
|
avg_norm_score_1_100,
|
|
performance,
|
|
n_yoy_total, n_yoy_positive,
|
|
best_yoy_from, best_yoy_to,
|
|
country_worst, country_best,
|
|
narrative_en,
|
|
narrative_id
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime
|
|
import logging
|
|
import json
|
|
|
|
from scripts.bigquery_config import get_bigquery_client
|
|
from scripts.bigquery_helpers import (
|
|
log_update,
|
|
load_to_bigquery,
|
|
read_from_bigquery,
|
|
setup_logging,
|
|
save_etl_metadata,
|
|
)
|
|
from google.cloud import bigquery
|
|
|
|
|
|
# =============================================================================
|
|
# MAPPING BAHASA INDONESIA
|
|
# =============================================================================
|
|
|
|
# Mapping nama pilar (Inggris -> Indonesia)
|
|
PILLAR_NAME_ID_MAP: dict = {
|
|
"Availability" : "Ketersediaan",
|
|
"Access" : "Akses",
|
|
"Utilization" : "Pemanfaatan",
|
|
"Stability" : "Stabilitas",
|
|
"availability" : "Ketersediaan",
|
|
"access" : "Akses",
|
|
"utilization" : "Pemanfaatan",
|
|
"stability" : "Stabilitas",
|
|
}
|
|
|
|
# Mapping nama indikator (Inggris -> Indonesia)
|
|
# Kunci: indicator_name lowercase stripped
|
|
INDICATOR_NAME_ID_MAP: dict = {
|
|
# --- Availability / Ketersediaan ---
|
|
"prevalence of undernourishment (percent) (3-year average)":
|
|
"Prevalensi kekurangan gizi (persen) (rata-rata 3 tahun)",
|
|
"number of people undernourished (million) (3-year average)":
|
|
"Jumlah penduduk kekurangan gizi (juta jiwa) (rata-rata 3 tahun)",
|
|
"prevalence of severe food insecurity in the total population (percent) (3-year average)":
|
|
"Prevalensi ketidaktahanan pangan berat pada total populasi (persen) (rata-rata 3 tahun)",
|
|
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)":
|
|
"Prevalensi ketidaktahanan pangan berat pada populasi dewasa laki-laki (persen) (rata-rata 3 tahun)",
|
|
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)":
|
|
"Prevalensi ketidaktahanan pangan berat pada populasi dewasa perempuan (persen) (rata-rata 3 tahun)",
|
|
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)":
|
|
"Prevalensi ketidaktahanan pangan sedang atau berat pada total populasi (persen) (rata-rata 3 tahun)",
|
|
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)":
|
|
"Prevalensi ketidaktahanan pangan sedang atau berat pada populasi dewasa laki-laki (persen) (rata-rata 3 tahun)",
|
|
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)":
|
|
"Prevalensi ketidaktahanan pangan sedang atau berat pada populasi dewasa perempuan (persen) (rata-rata 3 tahun)",
|
|
"number of severely food insecure people (million) (3-year average)":
|
|
"Jumlah penduduk mengalami ketidaktahanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
|
|
"number of severely food insecure male adults (million) (3-year average)":
|
|
"Jumlah dewasa laki-laki mengalami ketidaktahanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
|
|
"number of severely food insecure female adults (million) (3-year average)":
|
|
"Jumlah dewasa perempuan mengalami ketidaktahanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
|
|
"number of moderately or severely food insecure people (million) (3-year average)":
|
|
"Jumlah penduduk mengalami ketidaktahanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
|
|
"number of moderately or severely food insecure male adults (million) (3-year average)":
|
|
"Jumlah dewasa laki-laki mengalami ketidaktahanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
|
|
"number of moderately or severely food insecure female adults (million) (3-year average)":
|
|
"Jumlah dewasa perempuan mengalami ketidaktahanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
|
|
# --- Utilization / Pemanfaatan ---
|
|
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)":
|
|
"Persentase anak di bawah 5 tahun yang mengalami stunting (estimasi model) (persen)",
|
|
"number of children under 5 years of age who are stunted (modeled estimates) (million)":
|
|
"Jumlah anak di bawah 5 tahun yang mengalami stunting (estimasi model) (juta jiwa)",
|
|
"percentage of children under 5 years affected by wasting (percent)":
|
|
"Persentase anak di bawah 5 tahun yang mengalami wasting (persen)",
|
|
"number of children under 5 years affected by wasting (million)":
|
|
"Jumlah anak di bawah 5 tahun yang mengalami wasting (juta jiwa)",
|
|
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)":
|
|
"Persentase anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (persen)",
|
|
"number of children under 5 years of age who are overweight (modeled estimates) (million)":
|
|
"Jumlah anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (juta jiwa)",
|
|
"prevalence of anemia among women of reproductive age (15-49 years) (percent)":
|
|
"Prevalensi anemia pada perempuan usia reproduksi (15-49 tahun) (persen)",
|
|
"number of women of reproductive age (15-49 years) affected by anemia (million)":
|
|
"Jumlah perempuan usia reproduksi (15-49 tahun) yang mengalami anemia (juta jiwa)",
|
|
# --- Access / Akses ---
|
|
"gdp per capita (current us$)":
|
|
"PDB per kapita (US$ saat ini)",
|
|
"gdp per capita, ppp (current international $)":
|
|
"PDB per kapita, PPP (internasional $ saat ini)",
|
|
"food consumer price index (cpi)":
|
|
"Indeks Harga Konsumen (IHK) pangan",
|
|
"per capita food supply variability (kcal/cap/day)":
|
|
"Variabilitas pasokan pangan per kapita (kkal/kapita/hari)",
|
|
"percentage of population using at least basic drinking water services":
|
|
"Persentase penduduk yang menggunakan layanan air minum dasar",
|
|
"percentage of population using at least basic sanitation services":
|
|
"Persentase penduduk yang menggunakan layanan sanitasi dasar",
|
|
"prevalence of obesity in the adult population (18 years and older)":
|
|
"Prevalensi obesitas pada populasi dewasa (18 tahun ke atas)",
|
|
"prevalence of overweight in the adult population (18 years and older)":
|
|
"Prevalensi kelebihan berat badan pada populasi dewasa (18 tahun ke atas)",
|
|
"minimum dietary energy requirement (mder) (kcal/cap/day)":
|
|
"Kebutuhan energi pangan minimum (KEPM) (kkal/kapita/hari)",
|
|
"average dietary energy supply adequacy (percent) (3-year average)":
|
|
"Kecukupan rata-rata pasokan energi pangan (persen) (rata-rata 3 tahun)",
|
|
"average protein supply (g/cap/day) (3-year average)":
|
|
"Rata-rata pasokan protein (g/kapita/hari) (rata-rata 3 tahun)",
|
|
"average supply of protein of animal origin (g/cap/day) (3-year average)":
|
|
"Rata-rata pasokan protein hewani (g/kapita/hari) (rata-rata 3 tahun)",
|
|
# --- Stability / Stabilitas ---
|
|
"political stability and absence of violence/terrorism":
|
|
"Stabilitas politik dan ketiadaan kekerasan/terorisme",
|
|
"domestic food price volatility index":
|
|
"Indeks volatilitas harga pangan domestik",
|
|
"per capita food supply variability (kcal/capita/day)":
|
|
"Variabilitas pasokan pangan per kapita (kkal/kapita/hari)",
|
|
"cereal import dependency ratio (percent) (3-year average)":
|
|
"Rasio ketergantungan impor sereal (persen) (rata-rata 3 tahun)",
|
|
"value of food imports in total merchandise exports (percent) (3-year average)":
|
|
"Nilai impor pangan terhadap total ekspor barang (persen) (rata-rata 3 tahun)",
|
|
"share of dietary energy supply derived from cereals, roots and tubers (percent) (3-year average)":
|
|
"Pangsa pasokan energi pangan dari sereal, akar, dan umbi-umbian (persen) (rata-rata 3 tahun)",
|
|
}
|
|
|
|
|
|
def get_indicator_name_id(indicator_name: str) -> str:
|
|
"""Kembalikan terjemahan Bahasa Indonesia untuk nama indikator."""
|
|
return INDICATOR_NAME_ID_MAP.get(
|
|
str(indicator_name).lower().strip(),
|
|
str(indicator_name), # fallback: kembalikan nama asli jika tidak ada mapping
|
|
)
|
|
|
|
|
|
def get_pillar_name_id(pillar_name: str) -> str:
|
|
"""Kembalikan terjemahan Bahasa Indonesia untuk nama pilar."""
|
|
return PILLAR_NAME_ID_MAP.get(
|
|
str(pillar_name).strip(),
|
|
str(pillar_name), # fallback: kembalikan nama asli jika tidak ada mapping
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# SDG-ONLY KEYWORD SET
|
|
# =============================================================================
|
|
|
|
SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
|
"prevalence of undernourishment (percent) (3-year average)",
|
|
"number of people undernourished (million) (3-year average)",
|
|
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
|
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
|
|
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
|
|
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
|
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
|
|
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
|
|
"number of severely food insecure people (million) (3-year average)",
|
|
"number of severely food insecure male adults (million) (3-year average)",
|
|
"number of severely food insecure female adults (million) (3-year average)",
|
|
"number of moderately or severely food insecure people (million) (3-year average)",
|
|
"number of moderately or severely food insecure male adults (million) (3-year average)",
|
|
"number of moderately or severely food insecure female adults (million) (3-year average)",
|
|
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
|
|
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
|
|
"percentage of children under 5 years affected by wasting (percent)",
|
|
"number of children under 5 years affected by wasting (million)",
|
|
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
|
|
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
|
|
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
|
|
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
|
])
|
|
|
|
_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS)
|
|
|
|
_FIES_DETECTION_KEYWORDS: frozenset = frozenset([
|
|
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
|
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
|
"number of severely food insecure people (million) (3-year average)",
|
|
"number of moderately or severely food insecure people (million) (3-year average)",
|
|
])
|
|
_FIES_DETECTION_LOWER: frozenset = frozenset(k.lower() for k in _FIES_DETECTION_KEYWORDS)
|
|
|
|
DIRECTION_INVERT_KEYWORDS = frozenset({
|
|
"negative", "lower_better", "lower_is_better", "inverse", "neg",
|
|
})
|
|
DIRECTION_POSITIVE_KEYWORDS = frozenset({
|
|
"positive", "higher_better", "higher_is_better",
|
|
})
|
|
|
|
_PERFORMANCE_THRESHOLD: float = 60.0
|
|
|
|
|
|
# =============================================================================
|
|
# PURE HELPERS
|
|
# =============================================================================
|
|
|
|
def _should_invert(direction: str, logger=None, context: str = "") -> bool:
|
|
d = str(direction).lower().strip()
|
|
if d in DIRECTION_INVERT_KEYWORDS:
|
|
return True
|
|
if d in DIRECTION_POSITIVE_KEYWORDS:
|
|
return False
|
|
if logger:
|
|
logger.warning(
|
|
f" [DIRECTION WARNING] Unknown direction '{direction}' "
|
|
f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)."
|
|
)
|
|
return False
|
|
|
|
|
|
def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series:
|
|
values = series.dropna().values
|
|
if len(values) == 0:
|
|
return pd.Series(np.nan, index=series.index)
|
|
v_min, v_max = values.min(), values.max()
|
|
if v_min == v_max:
|
|
return pd.Series((lo + hi) / 2.0, index=series.index)
|
|
result = np.full(len(series), np.nan)
|
|
not_nan = series.notna()
|
|
result[not_nan.values] = lo + (series[not_nan].values - v_min) / (v_max - v_min) * (hi - lo)
|
|
return pd.Series(result, index=series.index)
|
|
|
|
|
|
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
|
|
df = df.sort_values("year").copy()
|
|
df["value_prev"] = df["value"].shift(1)
|
|
df["norm_value_prev"] = df["norm_value"].shift(1)
|
|
df["yoy_value"] = np.where(
|
|
df["value"].notna() & df["value_prev"].notna(),
|
|
df["value"] - df["value_prev"],
|
|
np.nan,
|
|
)
|
|
df["yoy_norm_value"] = np.where(
|
|
df["norm_value"].notna() & df["norm_value_prev"].notna(),
|
|
df["norm_value"] - df["norm_value_prev"],
|
|
np.nan,
|
|
)
|
|
df = df.drop(columns=["value_prev", "norm_value_prev"])
|
|
return df
|
|
|
|
|
|
def _is_lower_better(direction: str) -> bool:
|
|
return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS
|
|
|
|
|
|
# =============================================================================
|
|
# NARRATIVE CONDITION DETECTORS
|
|
# =============================================================================
|
|
|
|
def _detect_trend(scores_by_year: pd.Series, lower_better: bool) -> str:
|
|
if len(scores_by_year) < 3:
|
|
return "insufficient_data"
|
|
|
|
years = sorted(scores_by_year.index)
|
|
vals = [scores_by_year[y] for y in years if not pd.isna(scores_by_year.get(y, np.nan))]
|
|
|
|
if len(vals) < 3:
|
|
return "insufficient_data"
|
|
|
|
x = np.arange(len(vals))
|
|
slope = np.polyfit(x, vals, 1)[0]
|
|
|
|
improving = (slope > 0 and not lower_better) or (slope < 0 and lower_better)
|
|
|
|
mid = len(vals) // 2
|
|
first_half = vals[:mid]
|
|
second_half = vals[mid:]
|
|
slope1 = np.polyfit(np.arange(len(first_half)), first_half, 1)[0] if len(first_half) > 1 else 0
|
|
slope2 = np.polyfit(np.arange(len(second_half)), second_half, 1)[0] if len(second_half) > 1 else 0
|
|
|
|
cv = np.std(vals) / (np.mean(vals) + 1e-9)
|
|
|
|
if cv > 0.25:
|
|
return "fluctuating"
|
|
|
|
if improving:
|
|
if lower_better:
|
|
slowing = slope2 > slope1
|
|
else:
|
|
slowing = slope2 < slope1
|
|
return "improving_slowing" if slowing else "improving_consistent"
|
|
else:
|
|
return "deteriorating"
|
|
|
|
|
|
def _detect_gap_trend(df_ind: pd.DataFrame, lower_better: bool) -> str:
|
|
std_by_year = (
|
|
df_ind.groupby("year")["value"]
|
|
.std()
|
|
.dropna()
|
|
)
|
|
if len(std_by_year) < 3:
|
|
return "unknown"
|
|
|
|
years = sorted(std_by_year.index)
|
|
stds = [std_by_year[y] for y in years]
|
|
slope = np.polyfit(np.arange(len(stds)), stds, 1)[0]
|
|
|
|
if abs(slope) < 0.01 * np.mean(stds):
|
|
return "stable"
|
|
return "widening" if slope > 0 else "narrowing"
|
|
|
|
|
|
def _detect_anomaly_year(scores_by_year: pd.Series) -> tuple:
|
|
if len(scores_by_year) < 3:
|
|
return None, None
|
|
|
|
years = sorted(scores_by_year.index)
|
|
deltas = {}
|
|
for i in range(1, len(years)):
|
|
y_prev = years[i - 1]
|
|
y_curr = years[i]
|
|
v_prev = scores_by_year.get(y_prev, np.nan)
|
|
v_curr = scores_by_year.get(y_curr, np.nan)
|
|
if not pd.isna(v_prev) and not pd.isna(v_curr):
|
|
deltas[y_curr] = v_curr - v_prev
|
|
|
|
if not deltas:
|
|
return None, None
|
|
|
|
max_drop_year = min(deltas, key=deltas.get)
|
|
max_rise_year = max(deltas, key=deltas.get)
|
|
|
|
threshold = 1.5 * np.std(list(deltas.values()))
|
|
if abs(deltas[max_drop_year]) > threshold and deltas[max_drop_year] < 0:
|
|
return max_drop_year, "drop"
|
|
if abs(deltas[max_rise_year]) > threshold and deltas[max_rise_year] > 0:
|
|
return max_rise_year, "rise"
|
|
|
|
return None, None
|
|
|
|
|
|
def _detect_consistency(df_ind: pd.DataFrame, lower_better: bool) -> tuple:
|
|
country_avg = (
|
|
df_ind.groupby("country_name")["value"]
|
|
.mean()
|
|
.dropna()
|
|
)
|
|
if country_avg.empty:
|
|
return None, None, False
|
|
|
|
if lower_better:
|
|
best = country_avg.idxmin()
|
|
worst = country_avg.idxmax()
|
|
else:
|
|
best = country_avg.idxmax()
|
|
worst = country_avg.idxmin()
|
|
|
|
asean_avg_by_year = df_ind.groupby("year")["value"].mean()
|
|
country_by_year = df_ind[df_ind["country_name"] == best].set_index("year")["value"]
|
|
|
|
years_both = set(asean_avg_by_year.index) & set(country_by_year.index)
|
|
if not years_both:
|
|
return best, worst, False
|
|
|
|
if lower_better:
|
|
consistent = all(
|
|
country_by_year[y] <= asean_avg_by_year[y]
|
|
for y in years_both
|
|
if not pd.isna(country_by_year.get(y, np.nan))
|
|
)
|
|
else:
|
|
consistent = all(
|
|
country_by_year[y] >= asean_avg_by_year[y]
|
|
for y in years_both
|
|
if not pd.isna(country_by_year.get(y, np.nan))
|
|
)
|
|
|
|
return best, worst, consistent
|
|
|
|
|
|
# =============================================================================
|
|
# NARRATIVE BUILDER — plain text, no markdown, bilingual
|
|
# =============================================================================
|
|
|
|
def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tuple:
|
|
ind_id = int(row["indicator_id"])
|
|
ind_name = str(row["indicator_name"]).strip()
|
|
unit = str(row["unit"]).strip() if row["unit"] else ""
|
|
direction = str(row["direction"]).strip()
|
|
pillar = str(row["pillar_name"]).strip()
|
|
framework = str(row["framework"]).strip()
|
|
year_min = int(row["year_min"])
|
|
year_max = int(row["year_max"])
|
|
lower_better = _is_lower_better(direction)
|
|
|
|
df_ind = df_full[df_full["indicator_id"] == ind_id].copy()
|
|
|
|
if df_ind.empty:
|
|
na_en = f"{ind_name} ({framework}, {pillar}): Insufficient data for analysis."
|
|
na_id = f"{ind_name} ({framework}, {pillar}): Data tidak cukup untuk dianalisis."
|
|
return na_en, na_id
|
|
|
|
asean_avg_by_year = (
|
|
df_ind.groupby("year")["value"].mean().dropna()
|
|
)
|
|
|
|
trend_label = _detect_trend(asean_avg_by_year, lower_better)
|
|
gap_label = _detect_gap_trend(df_ind, lower_better)
|
|
anomaly_year, anomaly_dir = _detect_anomaly_year(asean_avg_by_year)
|
|
best_country, worst_country, is_consistent = _detect_consistency(df_ind, lower_better)
|
|
|
|
avg_first = row.get("avg_value_first", np.nan)
|
|
avg_last = row.get("avg_value_last", np.nan)
|
|
|
|
def fmt(v):
|
|
if pd.isna(v):
|
|
return "N/A"
|
|
abs_v = abs(v)
|
|
s = f"{v:,.1f}" if abs_v >= 1000 else (f"{v:.2f}" if abs_v >= 10 else f"{v:.3f}")
|
|
return f"{s} {unit}".strip() if unit else s
|
|
|
|
sentences_en = []
|
|
sentences_id = []
|
|
|
|
s1_en = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):"
|
|
s1_id = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):"
|
|
sentences_en.append(s1_en)
|
|
sentences_id.append(s1_id)
|
|
|
|
trend_map_en = {
|
|
"improving_consistent": f"Regional average improved consistently from {fmt(avg_first)} to {fmt(avg_last)}.",
|
|
"improving_slowing": f"Regional average improved from {fmt(avg_first)} to {fmt(avg_last)}, though the pace slowed in recent years.",
|
|
"deteriorating": f"Regional average worsened from {fmt(avg_first)} to {fmt(avg_last)} over the period.",
|
|
"fluctuating": f"Regional average fluctuated between {fmt(avg_first)} and {fmt(avg_last)} with no clear trend.",
|
|
"insufficient_data": f"Trend analysis is limited due to sparse data.",
|
|
}
|
|
trend_map_id = {
|
|
"improving_consistent": f"Rata-rata regional membaik secara konsisten dari {fmt(avg_first)} menjadi {fmt(avg_last)}.",
|
|
"improving_slowing": f"Rata-rata regional membaik dari {fmt(avg_first)} menjadi {fmt(avg_last)}, namun lajunya melambat dalam beberapa tahun terakhir.",
|
|
"deteriorating": f"Rata-rata regional memburuk dari {fmt(avg_first)} menjadi {fmt(avg_last)} sepanjang periode.",
|
|
"fluctuating": f"Rata-rata regional berfluktuasi antara {fmt(avg_first)} dan {fmt(avg_last)} tanpa tren yang jelas.",
|
|
"insufficient_data": f"Analisis tren terbatas karena data yang tersedia tidak cukup.",
|
|
}
|
|
sentences_en.append(trend_map_en.get(trend_label, ""))
|
|
sentences_id.append(trend_map_id.get(trend_label, ""))
|
|
|
|
if gap_label == "widening":
|
|
sentences_en.append("Disparity among ASEAN countries has widened over time, indicating unequal progress.")
|
|
sentences_id.append("Kesenjangan antar negara ASEAN melebar seiring waktu, menunjukkan kemajuan yang tidak merata.")
|
|
elif gap_label == "narrowing":
|
|
sentences_en.append("Disparity among ASEAN countries has narrowed, suggesting more balanced regional progress.")
|
|
sentences_id.append("Kesenjangan antar negara ASEAN menyempit, mengindikasikan kemajuan regional yang lebih merata.")
|
|
elif gap_label == "stable":
|
|
sentences_en.append("The gap among ASEAN countries remained relatively stable throughout the period.")
|
|
sentences_id.append("Kesenjangan antar negara ASEAN relatif stabil sepanjang periode.")
|
|
|
|
if anomaly_year is not None:
|
|
if anomaly_dir == "drop":
|
|
sentences_en.append(f"A notable decline was recorded in {anomaly_year}, which stood out from the overall pattern.")
|
|
sentences_id.append(f"Penurunan signifikan tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.")
|
|
elif anomaly_dir == "rise":
|
|
sentences_en.append(f"A sharp improvement was observed in {anomaly_year}, standing out from the overall pattern.")
|
|
sentences_id.append(f"Peningkatan tajam tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.")
|
|
|
|
if best_country and worst_country:
|
|
if is_consistent:
|
|
sentences_en.append(
|
|
f"{best_country} consistently performed above the regional average, "
|
|
f"while {worst_country} consistently lagged behind."
|
|
)
|
|
sentences_id.append(
|
|
f"{best_country} secara konsisten berada di atas rata-rata regional, "
|
|
f"sementara {worst_country} secara konsisten tertinggal."
|
|
)
|
|
else:
|
|
sentences_en.append(
|
|
f"Overall, {best_country} showed the best performance, "
|
|
f"while {worst_country} had the weakest results across the period."
|
|
)
|
|
sentences_id.append(
|
|
f"Secara keseluruhan, {best_country} menunjukkan performa terbaik, "
|
|
f"sementara {worst_country} memiliki hasil terlemah sepanjang periode."
|
|
)
|
|
|
|
narrative_en = " ".join(s for s in sentences_en if s)
|
|
narrative_id = " ".join(s for s in sentences_id if s)
|
|
|
|
return narrative_en, narrative_id
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN CLASS
|
|
# =============================================================================
|
|
|
|
class IndicatorNormAggregator:
|
|
"""
|
|
Hitung norm_value per indikator untuk seluruh data di
|
|
fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm.
|
|
Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator.
|
|
"""
|
|
|
|
def __init__(self, client: bigquery.Client):
|
|
self.client = client
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.logger.propagate = False
|
|
|
|
self.df = None
|
|
self.df_unit = None
|
|
self.sdgs_start_year = None
|
|
|
|
self.pipeline_start = None
|
|
self.pipeline_metadata = {
|
|
"rows_fetched" : 0,
|
|
"rows_loaded" : 0,
|
|
"rows_loaded_narrative" : 0,
|
|
"start_time" : None,
|
|
"end_time" : None,
|
|
}
|
|
|
|
# =========================================================================
|
|
# STEP 1: Load fact table
|
|
# =========================================================================
|
|
|
|
def load_data(self):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 1: LOAD DATA — fact_asean_food_security_selected")
|
|
self.logger.info("=" * 80)
|
|
|
|
self.df = read_from_bigquery(
|
|
self.client, "fact_asean_food_security_selected", layer="gold"
|
|
)
|
|
|
|
required = {
|
|
"country_id", "country_name",
|
|
"indicator_id", "indicator_name", "direction",
|
|
"pillar_id", "pillar_name",
|
|
"year", "value",
|
|
}
|
|
missing = required - set(self.df.columns)
|
|
if missing:
|
|
raise ValueError(f"Kolom tidak ditemukan: {missing}")
|
|
|
|
n_null = self.df["direction"].isna().sum()
|
|
if n_null > 0:
|
|
self.logger.warning(f" {n_null} rows direction NULL -> diisi 'positive'")
|
|
self.df["direction"] = self.df["direction"].fillna("positive")
|
|
|
|
self.pipeline_metadata["rows_fetched"] = len(self.df)
|
|
self.logger.info(f" Rows : {len(self.df):,}")
|
|
self.logger.info(f" Countries : {self.df['country_id'].nunique()}")
|
|
self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}")
|
|
self.logger.info(
|
|
f" Years : {int(self.df['year'].min())} - {int(self.df['year'].max())}"
|
|
)
|
|
|
|
# =========================================================================
|
|
# STEP 2: Load unit dari dim_indicator
|
|
# =========================================================================
|
|
|
|
def load_units(self):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 2: LOAD UNIT — dim_indicator")
|
|
self.logger.info("=" * 80)
|
|
|
|
dim = read_from_bigquery(self.client, "dim_indicator", layer="gold")
|
|
|
|
if "indicator_id" not in dim.columns or "unit" not in dim.columns:
|
|
raise ValueError(
|
|
f"dim_indicator harus punya kolom 'indicator_id' dan 'unit'. "
|
|
f"Kolom tersedia: {list(dim.columns)}"
|
|
)
|
|
|
|
self.df_unit = (
|
|
dim[["indicator_id", "unit"]]
|
|
.drop_duplicates(subset=["indicator_id"])
|
|
.copy()
|
|
)
|
|
self.df_unit["indicator_id"] = self.df_unit["indicator_id"].astype(int)
|
|
self.df_unit["unit"] = self.df_unit["unit"].fillna("").astype(str)
|
|
|
|
n_missing_unit = self.df_unit["unit"].eq("").sum()
|
|
self.logger.info(f" dim_indicator rows (unique indicator_id): {len(self.df_unit):,}")
|
|
self.logger.info(f" Indicator dengan unit kosong : {n_missing_unit}")
|
|
|
|
fact_ids = set(self.df["indicator_id"].astype(int).unique())
|
|
dim_ids = set(self.df_unit["indicator_id"].unique())
|
|
orphan = fact_ids - dim_ids
|
|
if orphan:
|
|
self.logger.warning(
|
|
f" [WARNING] {len(orphan)} indicator_id di fact tidak ditemukan di "
|
|
f"dim_indicator (unit akan diisi ''): {sorted(orphan)}"
|
|
)
|
|
|
|
# =========================================================================
|
|
# STEP 3: Merge unit ke df
|
|
# =========================================================================
|
|
|
|
def _merge_unit(self):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 3: MERGE UNIT -> fact df")
|
|
self.logger.info("=" * 80)
|
|
|
|
before = len(self.df)
|
|
self.df = self.df.merge(self.df_unit, on="indicator_id", how="left")
|
|
self.df["unit"] = self.df["unit"].fillna("").astype(str)
|
|
after = len(self.df)
|
|
|
|
assert before == after, (
|
|
f"Row count berubah setelah merge unit: {before} -> {after}"
|
|
)
|
|
|
|
n_empty = self.df["unit"].eq("").sum()
|
|
self.logger.info(
|
|
f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}"
|
|
)
|
|
|
|
# =========================================================================
|
|
# STEP 3b: Tambah kolom nama Bahasa Indonesia
|
|
# =========================================================================
|
|
|
|
def _add_indonesia_name_columns(self):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 3b: ADD BAHASA INDONESIA NAME COLUMNS")
|
|
self.logger.info("=" * 80)
|
|
|
|
self.df["indicator_name_id"] = (
|
|
self.df["indicator_name"]
|
|
.apply(get_indicator_name_id)
|
|
.astype(str)
|
|
)
|
|
self.df["pillar_name_id"] = (
|
|
self.df["pillar_name"]
|
|
.apply(get_pillar_name_id)
|
|
.astype(str)
|
|
)
|
|
|
|
n_indicator_mapped = (self.df["indicator_name_id"] != self.df["indicator_name"]).sum()
|
|
n_pillar_mapped = (self.df["pillar_name_id"] != self.df["pillar_name"]).sum()
|
|
self.logger.info(f" indicator_name_id mapped rows : {n_indicator_mapped:,}")
|
|
self.logger.info(f" pillar_name_id mapped rows : {n_pillar_mapped:,}")
|
|
|
|
# Log sample mapping
|
|
sample_ind = (
|
|
self.df[["indicator_name", "indicator_name_id"]]
|
|
.drop_duplicates()
|
|
.head(5)
|
|
)
|
|
self.logger.info("\n Sample indicator mapping (EN -> ID):")
|
|
for _, r in sample_ind.iterrows():
|
|
self.logger.info(f" EN: {r['indicator_name'][:55]}")
|
|
self.logger.info(f" ID: {r['indicator_name_id'][:55]}")
|
|
|
|
sample_pil = (
|
|
self.df[["pillar_name", "pillar_name_id"]]
|
|
.drop_duplicates()
|
|
)
|
|
self.logger.info("\n Pillar mapping (EN -> ID):")
|
|
for _, r in sample_pil.iterrows():
|
|
self.logger.info(f" {r['pillar_name']:<20} -> {r['pillar_name_id']}")
|
|
|
|
# =========================================================================
|
|
# STEP 4: Deteksi sdgs_start_year
|
|
# =========================================================================
|
|
|
|
def _detect_sdgs_start_year(self) -> int:
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 4: DETECT sdgs_start_year (first FIES year)")
|
|
self.logger.info("=" * 80)
|
|
|
|
fies_rows = self.df[
|
|
self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER)
|
|
]
|
|
if not fies_rows.empty:
|
|
sdgs_start = int(fies_rows["year"].min())
|
|
self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}")
|
|
return sdgs_start
|
|
|
|
self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar")
|
|
ind_min_year = (
|
|
self.df.groupby("indicator_id")["year"]
|
|
.min().reset_index()
|
|
.rename(columns={"year": "min_year"})
|
|
)
|
|
unique_years = sorted(ind_min_year["min_year"].unique())
|
|
|
|
if len(unique_years) == 1:
|
|
sdgs_start = int(unique_years[0]) + 9999
|
|
self.logger.info(" Hanya 1 cluster -> semua MDGs")
|
|
else:
|
|
gaps = [
|
|
(unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1])
|
|
for i in range(len(unique_years) - 1)
|
|
]
|
|
gaps.sort(reverse=True)
|
|
_, y_before, y_after = gaps[0]
|
|
sdgs_start = int(y_after)
|
|
self.logger.info(
|
|
f" Gap terbesar: {y_before} -> {y_after} -> sdgs_start_year = {sdgs_start}"
|
|
)
|
|
|
|
return sdgs_start
|
|
|
|
# =========================================================================
|
|
# STEP 5: Assign framework
|
|
# =========================================================================
|
|
|
|
def _assign_framework(self):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 5: ASSIGN FRAMEWORK PER BARIS")
|
|
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
|
|
self.logger.info("=" * 80)
|
|
|
|
df = self.df.copy()
|
|
df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER)
|
|
df["framework"] = "MDGs"
|
|
|
|
mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year)
|
|
df.loc[mask_sdgs, "framework"] = "SDGs"
|
|
df = df.drop(columns=["_is_sdg_kw"])
|
|
|
|
fw_dist = df["framework"].value_counts()
|
|
self.logger.info("\n Framework distribution (rows):")
|
|
for fw, cnt in fw_dist.items():
|
|
self.logger.info(f" {fw:<6}: {cnt:,} rows")
|
|
|
|
self.df = df
|
|
|
|
# =========================================================================
|
|
# STEP 6: Hitung norm_value per indikator
|
|
# =========================================================================
|
|
|
|
def _compute_norm_values(self) -> pd.DataFrame:
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 6: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)")
|
|
self.logger.info("=" * 80)
|
|
|
|
df = self.df.copy()
|
|
norm_parts = []
|
|
|
|
for ind_id, grp in df.groupby("indicator_id"):
|
|
grp = grp.copy()
|
|
direction = str(grp["direction"].iloc[0])
|
|
do_invert = _should_invert(
|
|
direction, self.logger, context=f"indicator_id={ind_id}"
|
|
)
|
|
|
|
valid_mask = grp["value"].notna()
|
|
n_valid = valid_mask.sum()
|
|
|
|
if n_valid < 2:
|
|
grp["norm_value"] = np.nan
|
|
norm_parts.append(grp)
|
|
self.logger.warning(
|
|
f" [SKIP] indicator_id={ind_id}: only {n_valid} valid values"
|
|
)
|
|
continue
|
|
|
|
raw = grp.loc[valid_mask, "value"].values
|
|
v_min = raw.min()
|
|
v_max = raw.max()
|
|
|
|
normed = np.full(len(grp), np.nan)
|
|
if v_min == v_max:
|
|
normed[valid_mask.values] = 0.5
|
|
else:
|
|
normed[valid_mask.values] = (raw - v_min) / (v_max - v_min)
|
|
|
|
if do_invert:
|
|
normed = np.where(np.isnan(normed), np.nan, 1.0 - normed)
|
|
|
|
grp["norm_value"] = normed
|
|
norm_parts.append(grp)
|
|
|
|
df_normed = pd.concat(norm_parts, ignore_index=True)
|
|
self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators")
|
|
return df_normed
|
|
|
|
# =========================================================================
|
|
# STEP 7: Hitung YoY
|
|
# =========================================================================
|
|
|
|
def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 7: COMPUTE YoY COLUMNS (per indicator, per country)")
|
|
self.logger.info("=" * 80)
|
|
|
|
parts = []
|
|
groups = df.groupby(["indicator_id", "country_id"], sort=False)
|
|
self.logger.info(f" Processing {groups.ngroups:,} (indicator x country) groups...")
|
|
|
|
for (ind_id, country_id), grp in groups:
|
|
parts.append(_compute_yoy(grp))
|
|
|
|
df_out = pd.concat(parts, ignore_index=True)
|
|
self.logger.info(f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}")
|
|
self.logger.info(f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}")
|
|
return df_out
|
|
|
|
# =========================================================================
|
|
# STEP 8: Scale ke 1-100
|
|
# =========================================================================
|
|
|
|
def _compute_scores(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 8: SCALE TO 1-100")
|
|
self.logger.info("=" * 80)
|
|
|
|
score_parts = []
|
|
for ind_id, grp in df.groupby("indicator_id"):
|
|
grp = grp.copy()
|
|
grp["norm_score_1_100"] = global_minmax(grp["norm_value"])
|
|
score_parts.append(grp)
|
|
df = pd.concat(score_parts, ignore_index=True)
|
|
|
|
self.logger.info(
|
|
f" norm_score_1_100 range: "
|
|
f"{df['norm_score_1_100'].min():.2f} - {df['norm_score_1_100'].max():.2f}"
|
|
)
|
|
return df
|
|
|
|
# =========================================================================
|
|
# STEP 9: Assign performance label
|
|
# =========================================================================
|
|
|
|
def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info(
|
|
f"STEP 9: ASSIGN PERFORMANCE LABEL "
|
|
f"(threshold >= {_PERFORMANCE_THRESHOLD} -> Good)"
|
|
)
|
|
self.logger.info("=" * 80)
|
|
|
|
df = df.copy()
|
|
df["performance"] = pd.NA
|
|
|
|
has_score = df["norm_score_1_100"].notna()
|
|
df.loc[has_score & (df["norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
|
|
df.loc[has_score & (df["norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
|
|
|
|
n_good = (df["performance"] == "Good").sum()
|
|
n_bad = (df["performance"] == "Bad").sum()
|
|
n_null = df["performance"].isna().sum()
|
|
total = len(df)
|
|
|
|
self.logger.info(f" Good : {n_good:,} ({n_good/total*100:.1f}%)")
|
|
self.logger.info(f" Bad : {n_bad:,} ({n_bad/total*100:.1f}%)")
|
|
self.logger.info(f" Null : {n_null:,} ({n_null/total*100:.1f}%)")
|
|
return df
|
|
|
|
# =========================================================================
|
|
# STEP 10: Save agg_indicator_norm
|
|
# =========================================================================
|
|
|
|
def _save(self, df: pd.DataFrame) -> int:
|
|
table_name = "agg_indicator_norm"
|
|
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info(f"STEP 10: SAVE -> [Gold] {table_name}")
|
|
self.logger.info("=" * 80)
|
|
|
|
out = df[[
|
|
"year", "country_id", "country_name",
|
|
"indicator_id", "indicator_name", "indicator_name_id",
|
|
"unit", "direction",
|
|
"pillar_id", "pillar_name", "pillar_name_id",
|
|
"framework",
|
|
"value", "norm_value", "norm_score_1_100",
|
|
"yoy_value", "yoy_norm_value", "performance",
|
|
]].copy()
|
|
|
|
out = out.sort_values(
|
|
["year", "country_name", "pillar_name", "indicator_name"]
|
|
).reset_index(drop=True)
|
|
|
|
out["year"] = out["year"].astype(int)
|
|
out["country_id"] = out["country_id"].astype(int)
|
|
out["country_name"] = out["country_name"].astype(str)
|
|
out["indicator_id"] = out["indicator_id"].astype(int)
|
|
out["indicator_name"] = out["indicator_name"].astype(str)
|
|
out["indicator_name_id"] = out["indicator_name_id"].astype(str)
|
|
out["unit"] = out["unit"].astype(str)
|
|
out["direction"] = out["direction"].astype(str)
|
|
out["pillar_id"] = out["pillar_id"].astype(int)
|
|
out["pillar_name"] = out["pillar_name"].astype(str)
|
|
out["pillar_name_id"] = out["pillar_name_id"].astype(str)
|
|
out["framework"] = out["framework"].astype(str)
|
|
out["value"] = out["value"].astype(float)
|
|
out["norm_value"] = out["norm_value"].astype(float)
|
|
out["norm_score_1_100"] = out["norm_score_1_100"].astype(float)
|
|
out["yoy_value"] = pd.to_numeric(out["yoy_value"], errors="coerce").astype(float)
|
|
out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float)
|
|
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
|
|
|
|
self.logger.info(f" Total rows : {len(out):,}")
|
|
self.logger.info(f" Countries : {out['country_id'].nunique()}")
|
|
self.logger.info(f" Indicators : {out['indicator_id'].nunique()}")
|
|
self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}")
|
|
|
|
schema = [
|
|
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_name_id", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("pillar_name_id", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
|
|
bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("yoy_value", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("yoy_norm_value", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
|
|
]
|
|
|
|
rows_loaded = load_to_bigquery(
|
|
self.client, out, table_name,
|
|
layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema,
|
|
)
|
|
|
|
log_update(self.client, "DW", table_name, "full_load", rows_loaded)
|
|
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
|
|
|
|
metadata = {
|
|
"source_class" : self.__class__.__name__,
|
|
"table_name" : table_name,
|
|
"execution_timestamp": self.pipeline_start,
|
|
"duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(),
|
|
"rows_fetched" : self.pipeline_metadata["rows_fetched"],
|
|
"rows_transformed" : rows_loaded,
|
|
"rows_loaded" : rows_loaded,
|
|
"completeness_pct" : 100.0,
|
|
"config_snapshot" : json.dumps({
|
|
"sdgs_start_year" : self.sdgs_start_year,
|
|
"sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS),
|
|
"layer" : "gold",
|
|
"normalization" : "per_indicator_global_minmax",
|
|
"direction_handling" : "lower_better_inverted",
|
|
"yoy_columns" : ["yoy_value", "yoy_norm_value"],
|
|
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
|
"unit_source" : "dim_indicator",
|
|
"added_columns" : ["indicator_name_id", "pillar_name_id"],
|
|
}),
|
|
"validation_metrics" : json.dumps({
|
|
"total_rows" : rows_loaded,
|
|
"n_indicators" : int(out["indicator_id"].nunique()),
|
|
"n_countries" : int(out["country_id"].nunique()),
|
|
"sdgs_start_year": self.sdgs_start_year,
|
|
}),
|
|
}
|
|
save_etl_metadata(self.client, metadata)
|
|
return rows_loaded
|
|
|
|
# =========================================================================
|
|
# STEP 11: Summary log
|
|
# =========================================================================
|
|
|
|
def _log_summary(self, df: pd.DataFrame):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 11: SUMMARY — agg_indicator_norm")
|
|
self.logger.info("=" * 80)
|
|
|
|
ind_avg = (
|
|
df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"])
|
|
["norm_score_1_100"].mean()
|
|
.reset_index()
|
|
.sort_values("norm_score_1_100", ascending=False)
|
|
)
|
|
|
|
self.logger.info("\n TOP 5 Indicators (avg norm_score_1_100):")
|
|
for _, r in ind_avg.head(5).iterrows():
|
|
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
|
self.logger.info(
|
|
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
|
|
f"{r['norm_score_1_100']:.2f} {tag}"
|
|
)
|
|
|
|
self.logger.info("\n BOTTOM 5 Indicators:")
|
|
for _, r in ind_avg.tail(5).iterrows():
|
|
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
|
self.logger.info(
|
|
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
|
|
f"{r['norm_score_1_100']:.2f} {tag}"
|
|
)
|
|
|
|
# =========================================================================
|
|
# STEP 12-17: agg_narrative_indicator
|
|
# =========================================================================
|
|
|
|
def _build_narrative_table(self, df_final: pd.DataFrame):
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("STEP 12-17: agg_narrative_indicator")
|
|
self.logger.info(" Granularity: per indicator_id (all years + all ASEAN countries)")
|
|
self.logger.info(" Narrative : interpretatif, plain text, bilingual EN/ID")
|
|
self.logger.info("=" * 80)
|
|
|
|
df = df_final.copy()
|
|
|
|
# ---- Agregasi statistik per indikator ----
|
|
df_yr = (
|
|
df.groupby(["indicator_id", "year"])
|
|
.agg(
|
|
avg_value =("value", "mean"),
|
|
avg_norm_score =("norm_score_1_100", "mean"),
|
|
n_countries_yr =("country_id", "nunique"),
|
|
)
|
|
.reset_index()
|
|
)
|
|
|
|
df_first = (
|
|
df_yr.sort_values("year").groupby("indicator_id").first().reset_index()
|
|
[["indicator_id", "year", "avg_value"]]
|
|
.rename(columns={"year": "year_min", "avg_value": "avg_value_first"})
|
|
)
|
|
df_last = (
|
|
df_yr.sort_values("year").groupby("indicator_id").last().reset_index()
|
|
[["indicator_id", "year", "avg_value"]]
|
|
.rename(columns={"year": "year_max", "avg_value": "avg_value_last"})
|
|
)
|
|
df_score_avg = (
|
|
df_yr.groupby("indicator_id")
|
|
.agg(avg_norm_score_1_100=("avg_norm_score", "mean"))
|
|
.reset_index()
|
|
)
|
|
df_nc = (
|
|
df.groupby("indicator_id")["country_id"]
|
|
.nunique().reset_index()
|
|
.rename(columns={"country_id": "n_countries"})
|
|
)
|
|
|
|
# YoY stats
|
|
dir_map = (
|
|
df[["indicator_id", "direction"]]
|
|
.drop_duplicates(subset=["indicator_id"])
|
|
.set_index("indicator_id")["direction"]
|
|
.to_dict()
|
|
)
|
|
|
|
yoy_parts = []
|
|
for ind_id, grp in df_yr.groupby("indicator_id"):
|
|
grp = grp.sort_values("year").copy()
|
|
grp["prev_avg"] = grp["avg_value"].shift(1)
|
|
grp["yoy"] = np.where(
|
|
grp["avg_value"].notna() & grp["prev_avg"].notna(),
|
|
grp["avg_value"] - grp["prev_avg"],
|
|
np.nan,
|
|
)
|
|
grp = grp.drop(columns=["prev_avg"])
|
|
yoy_parts.append(grp)
|
|
df_yr = pd.concat(yoy_parts, ignore_index=True)
|
|
|
|
def _is_positive_yoy(ind_id, yoy_val):
|
|
if pd.isna(yoy_val):
|
|
return False
|
|
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
|
return (yoy_val < 0) if lb else (yoy_val > 0)
|
|
|
|
yoy_stats = []
|
|
for ind_id, grp in df_yr.groupby("indicator_id"):
|
|
grp_yoy = grp[grp["yoy"].notna()].copy()
|
|
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
|
n_total = len(grp_yoy)
|
|
n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"]))
|
|
|
|
if n_total > 0:
|
|
idx_best = grp_yoy["yoy"].idxmin() if lb else grp_yoy["yoy"].idxmax()
|
|
best_row = grp_yoy.loc[idx_best]
|
|
best_yoy_from = best_row["year"] - 1
|
|
best_yoy_to = best_row["year"]
|
|
else:
|
|
best_yoy_from = np.nan
|
|
best_yoy_to = np.nan
|
|
|
|
yoy_stats.append({
|
|
"indicator_id" : ind_id,
|
|
"n_yoy_total" : n_total,
|
|
"n_yoy_positive": n_positive,
|
|
"best_yoy_from" : best_yoy_from,
|
|
"best_yoy_to" : best_yoy_to,
|
|
})
|
|
df_yoy_stats = pd.DataFrame(yoy_stats)
|
|
|
|
# Country best/worst
|
|
df_country_avg = (
|
|
df.groupby(["indicator_id", "country_id", "country_name"])
|
|
.agg(country_avg_value=("value", "mean"))
|
|
.reset_index()
|
|
)
|
|
country_stats = []
|
|
for ind_id, grp in df_country_avg.groupby("indicator_id"):
|
|
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
|
if lb:
|
|
worst_row = grp.loc[grp["country_avg_value"].idxmax()]
|
|
best_row = grp.loc[grp["country_avg_value"].idxmin()]
|
|
else:
|
|
worst_row = grp.loc[grp["country_avg_value"].idxmin()]
|
|
best_row = grp.loc[grp["country_avg_value"].idxmax()]
|
|
country_stats.append({
|
|
"indicator_id" : ind_id,
|
|
"country_worst": worst_row["country_name"],
|
|
"country_best" : best_row["country_name"],
|
|
})
|
|
df_country_stats = pd.DataFrame(country_stats)
|
|
|
|
# Dim cols — sertakan kolom Indonesia
|
|
dim_cols = [
|
|
"indicator_name", "indicator_name_id",
|
|
"unit", "direction",
|
|
"pillar_name", "pillar_name_id",
|
|
"framework",
|
|
]
|
|
df_dim = df[["indicator_id"] + dim_cols].drop_duplicates(subset=["indicator_id"])
|
|
|
|
# Merge semua
|
|
df_agg = (
|
|
df_dim
|
|
.merge(df_first, on="indicator_id", how="left")
|
|
.merge(df_last, on="indicator_id", how="left")
|
|
.merge(df_score_avg, on="indicator_id", how="left")
|
|
.merge(df_nc, on="indicator_id", how="left")
|
|
.merge(df_yoy_stats, on="indicator_id", how="left")
|
|
.merge(df_country_stats, on="indicator_id", how="left")
|
|
)
|
|
|
|
# Performance
|
|
df_agg["performance"] = pd.NA
|
|
has_score = df_agg["avg_norm_score_1_100"].notna()
|
|
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
|
|
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
|
|
|
|
# ---- Build narrative ----
|
|
self.logger.info("\n--- BUILD NARRATIVE (interpretatif, plain text, bilingual EN/ID) ---")
|
|
narratives_en = []
|
|
narratives_id = []
|
|
|
|
for _, row in df_agg.iterrows():
|
|
n_en, n_id = _build_narrative_per_indicator(row, df)
|
|
narratives_en.append(n_en)
|
|
narratives_id.append(n_id)
|
|
|
|
df_agg["narrative_en"] = narratives_en
|
|
df_agg["narrative_id"] = narratives_id
|
|
|
|
self.logger.info(f" Narratives generated: {len(df_agg):,}")
|
|
self.logger.info("\n Sample EN (first):")
|
|
self.logger.info(f" {df_agg.iloc[0]['narrative_en'][:300]}")
|
|
self.logger.info("\n Sample ID (first):")
|
|
self.logger.info(f" {df_agg.iloc[0]['narrative_id'][:300]}")
|
|
|
|
# ---- Save ----
|
|
out = df_agg[[
|
|
"indicator_id", "indicator_name", "indicator_name_id",
|
|
"unit", "direction",
|
|
"pillar_name", "pillar_name_id",
|
|
"framework",
|
|
"year_min", "year_max", "n_countries",
|
|
"avg_value_first", "avg_value_last",
|
|
"avg_norm_score_1_100", "performance",
|
|
"n_yoy_total", "n_yoy_positive",
|
|
"best_yoy_from", "best_yoy_to",
|
|
"country_worst", "country_best",
|
|
"narrative_en", "narrative_id",
|
|
]].copy()
|
|
|
|
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
|
|
|
|
out["indicator_id"] = out["indicator_id"].astype(int)
|
|
out["indicator_name"] = out["indicator_name"].astype(str)
|
|
out["indicator_name_id"] = out["indicator_name_id"].astype(str)
|
|
out["unit"] = out["unit"].fillna("").astype(str)
|
|
out["direction"] = out["direction"].astype(str)
|
|
out["pillar_name"] = out["pillar_name"].astype(str)
|
|
out["pillar_name_id"] = out["pillar_name_id"].astype(str)
|
|
out["framework"] = out["framework"].astype(str)
|
|
out["year_min"] = out["year_min"].astype(int)
|
|
out["year_max"] = out["year_max"].astype(int)
|
|
out["n_countries"] = out["n_countries"].astype(int)
|
|
out["avg_value_first"] = pd.to_numeric(out["avg_value_first"], errors="coerce").astype(float)
|
|
out["avg_value_last"] = pd.to_numeric(out["avg_value_last"], errors="coerce").astype(float)
|
|
out["avg_norm_score_1_100"] = pd.to_numeric(out["avg_norm_score_1_100"], errors="coerce").astype(float)
|
|
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
|
|
out["n_yoy_total"] = pd.to_numeric(out["n_yoy_total"], errors="coerce").astype("Int64")
|
|
out["n_yoy_positive"] = pd.to_numeric(out["n_yoy_positive"], errors="coerce").astype("Int64")
|
|
out["best_yoy_from"] = pd.to_numeric(out["best_yoy_from"], errors="coerce").astype("Int64")
|
|
out["best_yoy_to"] = pd.to_numeric(out["best_yoy_to"], errors="coerce").astype("Int64")
|
|
out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string")
|
|
out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string")
|
|
out["narrative_en"] = out["narrative_en"].astype(str)
|
|
out["narrative_id"] = out["narrative_id"].astype(str)
|
|
|
|
schema = [
|
|
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_name_id", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("pillar_name_id", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("year_min", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("year_max", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"),
|
|
bigquery.SchemaField("avg_value_first", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("avg_value_last", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("n_yoy_total", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("n_yoy_positive", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("best_yoy_from", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("best_yoy_to", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("country_worst", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("narrative_en", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("narrative_id", "STRING", mode="NULLABLE"),
|
|
]
|
|
|
|
rows_loaded = load_to_bigquery(
|
|
self.client, out, "agg_narrative_indicator",
|
|
layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema,
|
|
)
|
|
|
|
log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded)
|
|
self.logger.info(
|
|
f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold"
|
|
)
|
|
|
|
metadata = {
|
|
"source_class" : self.__class__.__name__,
|
|
"table_name" : "agg_narrative_indicator",
|
|
"execution_timestamp": self.pipeline_start,
|
|
"duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(),
|
|
"rows_fetched" : self.pipeline_metadata["rows_fetched"],
|
|
"rows_transformed" : rows_loaded,
|
|
"rows_loaded" : rows_loaded,
|
|
"completeness_pct" : 100.0,
|
|
"config_snapshot" : json.dumps({
|
|
"source_table" : "agg_indicator_norm (in-memory df_final)",
|
|
"granularity" : "indicator_id only (all years, all ASEAN countries)",
|
|
"narrative_style" : "interpretive, plain text, no markdown, bilingual EN/ID",
|
|
"narrative_dimensions" : ["trend", "gap_trend", "anomaly", "country_consistency"],
|
|
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
|
"layer" : "gold",
|
|
"added_columns" : ["indicator_name_id", "pillar_name_id"],
|
|
}),
|
|
"validation_metrics" : json.dumps({
|
|
"total_rows" : rows_loaded,
|
|
"n_indicators": int(out["indicator_id"].nunique()),
|
|
}),
|
|
}
|
|
save_etl_metadata(self.client, metadata)
|
|
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
|
|
|
|
# =========================================================================
|
|
# RUN
|
|
# =========================================================================
|
|
|
|
def run(self):
|
|
self.pipeline_start = datetime.now()
|
|
self.pipeline_metadata["start_time"] = self.pipeline_start
|
|
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("INDICATOR NORM AGGREGATION")
|
|
self.logger.info(" Source : fact_asean_food_security_selected")
|
|
self.logger.info(" Dim : dim_indicator (unit)")
|
|
self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold")
|
|
self.logger.info(" agg_narrative_indicator -> fs_asean_gold")
|
|
self.logger.info(" Added : indicator_name_id, pillar_name_id (Bahasa Indonesia)")
|
|
self.logger.info("=" * 80)
|
|
|
|
self.load_data()
|
|
self.load_units()
|
|
self._merge_unit()
|
|
self._add_indonesia_name_columns() # <-- BARU
|
|
self.sdgs_start_year = self._detect_sdgs_start_year()
|
|
self._assign_framework()
|
|
df_normed = self._compute_norm_values()
|
|
df_yoy = self._compute_yoy_columns(df_normed)
|
|
df_scored = self._compute_scores(df_yoy)
|
|
df_final = self._assign_performance(df_scored)
|
|
rows_loaded = self._save(df_final)
|
|
self.pipeline_metadata["rows_loaded"] = rows_loaded
|
|
self._log_summary(df_final)
|
|
self._build_narrative_table(df_final)
|
|
|
|
self.pipeline_metadata["end_time"] = datetime.now()
|
|
duration = (
|
|
self.pipeline_metadata["end_time"] - self.pipeline_start
|
|
).total_seconds()
|
|
|
|
self.logger.info("\n" + "=" * 80)
|
|
self.logger.info("COMPLETED")
|
|
self.logger.info("=" * 80)
|
|
self.logger.info(f" Duration : {duration:.2f}s")
|
|
self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}")
|
|
self.logger.info(f" Rows Loaded (norm) : {rows_loaded:,}")
|
|
self.logger.info(f" Rows Loaded (narrative) : {self.pipeline_metadata['rows_loaded_narrative']:,}")
|
|
self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}")
|
|
|
|
|
|
# =============================================================================
|
|
# AIRFLOW TASK
|
|
# =============================================================================
|
|
|
|
def run_indicator_norm_aggregation():
|
|
client = get_bigquery_client()
|
|
agg = IndicatorNormAggregator(client)
|
|
agg.run()
|
|
print(f"agg_indicator_norm loaded : {agg.pipeline_metadata['rows_loaded']:,} rows")
|
|
print(f"agg_narrative_indicator loaded: {agg.pipeline_metadata['rows_loaded_narrative']:,} rows")
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import sys, io
|
|
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
|
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
|
|
|
print("=" * 80)
|
|
print("INDICATOR NORM AGGREGATION -> fs_asean_gold")
|
|
print("=" * 80)
|
|
|
|
logger = setup_logging()
|
|
client = get_bigquery_client()
|
|
agg = IndicatorNormAggregator(client)
|
|
agg.run()
|
|
|
|
print("\n" + "=" * 80)
|
|
print("[OK] COMPLETED")
|
|
print("=" * 80) |