create colomn indonesian text
This commit is contained in:
@@ -32,21 +32,22 @@ Output Schema (agg_indicator_norm):
|
|||||||
year, country_id, country_name,
|
year, country_id, country_name,
|
||||||
indicator_id, indicator_name, unit, direction,
|
indicator_id, indicator_name, unit, direction,
|
||||||
pillar_id, pillar_name,
|
pillar_id, pillar_name,
|
||||||
framework, -- "MDGs" | "SDGs"
|
framework,
|
||||||
value, -- raw value asli
|
value,
|
||||||
norm_value, -- 0-1, direction sudah diperhitungkan
|
norm_value,
|
||||||
norm_score_1_100, -- scaled 1-100 (global per indikator)
|
norm_score_1_100,
|
||||||
yoy_value, -- perubahan absolut value YoY
|
yoy_value,
|
||||||
yoy_norm_value, -- perubahan absolut norm_value YoY
|
yoy_norm_value,
|
||||||
performance -- "Good" | "Bad" | null
|
performance
|
||||||
|
|
||||||
=============================================================================
|
=============================================================================
|
||||||
agg_narrative_indicator
|
agg_narrative_indicator
|
||||||
=============================================================================
|
=============================================================================
|
||||||
Tujuan:
|
Tujuan:
|
||||||
Menghasilkan narasi otomatis 1 paragraf per indikator (level ASEAN,
|
Menghasilkan narasi otomatis per indikator (granularity: indicator_id).
|
||||||
merangkum seluruh periode + seluruh negara), dijalankan otomatis setelah
|
Narasi membaca kondisi nyata dari data: tren, gap, anomali, konsistensi.
|
||||||
agg_indicator_norm selesai dalam pipeline yang sama.
|
Tersedia dalam dua bahasa: Inggris (narrative_en) dan Indonesia (narrative_id).
|
||||||
|
Tanpa markdown bold (**) agar aman ditampilkan di Looker Studio.
|
||||||
|
|
||||||
Granularity:
|
Granularity:
|
||||||
indicator_id (all years, all ASEAN countries)
|
indicator_id (all years, all ASEAN countries)
|
||||||
@@ -57,11 +58,12 @@ Output Schema (agg_narrative_indicator):
|
|||||||
year_min, year_max, n_countries,
|
year_min, year_max, n_countries,
|
||||||
avg_value_first, avg_value_last,
|
avg_value_first, avg_value_last,
|
||||||
avg_norm_score_1_100,
|
avg_norm_score_1_100,
|
||||||
performance, -- Good | Bad | null
|
performance,
|
||||||
n_yoy_total, n_yoy_positive,
|
n_yoy_total, n_yoy_positive,
|
||||||
best_yoy_from, best_yoy_to,
|
best_yoy_from, best_yoy_to,
|
||||||
country_worst, country_best,
|
country_worst, country_best,
|
||||||
narrative
|
narrative_en,
|
||||||
|
narrative_id
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@@ -86,10 +88,8 @@ from google.cloud import bigquery
|
|||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
||||||
# TARGET 2.1.1 - Undernourishment
|
|
||||||
"prevalence of undernourishment (percent) (3-year average)",
|
"prevalence of undernourishment (percent) (3-year average)",
|
||||||
"number of people undernourished (million) (3-year average)",
|
"number of people undernourished (million) (3-year average)",
|
||||||
# TARGET 2.1.2 - Food Insecurity (FIES)
|
|
||||||
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
||||||
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
|
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
|
||||||
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
|
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
|
||||||
@@ -102,16 +102,12 @@ SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
|||||||
"number of moderately or severely food insecure people (million) (3-year average)",
|
"number of moderately or severely food insecure people (million) (3-year average)",
|
||||||
"number of moderately or severely food insecure male adults (million) (3-year average)",
|
"number of moderately or severely food insecure male adults (million) (3-year average)",
|
||||||
"number of moderately or severely food insecure female adults (million) (3-year average)",
|
"number of moderately or severely food insecure female adults (million) (3-year average)",
|
||||||
# TARGET 2.2.1 - Stunting
|
|
||||||
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
|
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
|
||||||
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
|
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
|
||||||
# TARGET 2.2.2 - Wasting
|
|
||||||
"percentage of children under 5 years affected by wasting (percent)",
|
"percentage of children under 5 years affected by wasting (percent)",
|
||||||
"number of children under 5 years affected by wasting (million)",
|
"number of children under 5 years affected by wasting (million)",
|
||||||
# TARGET 2.2.2 - Overweight (children)
|
|
||||||
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
|
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
|
||||||
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
|
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
|
||||||
# TARGET 2.2.3 - Anaemia
|
|
||||||
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
|
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
|
||||||
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
||||||
])
|
])
|
||||||
@@ -168,28 +164,19 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S
|
|||||||
|
|
||||||
|
|
||||||
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
|
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
|
||||||
"""
|
|
||||||
Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year.
|
|
||||||
Kolom yang ditambahkan: yoy_value, yoy_norm_value.
|
|
||||||
Baris pertama tiap grup selalu null.
|
|
||||||
"""
|
|
||||||
df = df.sort_values("year").copy()
|
df = df.sort_values("year").copy()
|
||||||
|
|
||||||
df["value_prev"] = df["value"].shift(1)
|
df["value_prev"] = df["value"].shift(1)
|
||||||
df["norm_value_prev"] = df["norm_value"].shift(1)
|
df["norm_value_prev"] = df["norm_value"].shift(1)
|
||||||
|
|
||||||
df["yoy_value"] = np.where(
|
df["yoy_value"] = np.where(
|
||||||
df["value"].notna() & df["value_prev"].notna(),
|
df["value"].notna() & df["value_prev"].notna(),
|
||||||
df["value"] - df["value_prev"],
|
df["value"] - df["value_prev"],
|
||||||
np.nan,
|
np.nan,
|
||||||
)
|
)
|
||||||
|
|
||||||
df["yoy_norm_value"] = np.where(
|
df["yoy_norm_value"] = np.where(
|
||||||
df["norm_value"].notna() & df["norm_value_prev"].notna(),
|
df["norm_value"].notna() & df["norm_value_prev"].notna(),
|
||||||
df["norm_value"] - df["norm_value_prev"],
|
df["norm_value"] - df["norm_value_prev"],
|
||||||
np.nan,
|
np.nan,
|
||||||
)
|
)
|
||||||
|
|
||||||
df = df.drop(columns=["value_prev", "norm_value_prev"])
|
df = df.drop(columns=["value_prev", "norm_value_prev"])
|
||||||
return df
|
return df
|
||||||
|
|
||||||
@@ -199,18 +186,163 @@ def _is_lower_better(direction: str) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# NARRATIVE BUILDER — agg_narrative_indicator
|
# NARRATIVE CONDITION DETECTORS
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def _build_narrative_per_indicator(row: pd.Series) -> str:
|
def _detect_trend(scores_by_year: pd.Series, lower_better: bool) -> str:
|
||||||
"""
|
"""
|
||||||
Narrative format (no em-dash, bold on key figures):
|
Deteksi tren: improving_consistent, improving_slowing, fluctuating, deteriorating.
|
||||||
**{indicator}** ({framework}, {pillar}): ASEAN average {rose/fell} from
|
scores_by_year: Series dengan index=year, value=avg_score (sudah direction-aware).
|
||||||
**{first}** to **{last}** ({year_min} to {year_max}), **{improving/deteriorating}** trend.
|
|
||||||
Score: **{score}/100** (*{Good/Bad}*).
|
|
||||||
Best country: **{best}**; worst: **{worst}**.
|
|
||||||
Improved in **{n_pos}/{n_total}** YoY transitions.
|
|
||||||
"""
|
"""
|
||||||
|
if len(scores_by_year) < 3:
|
||||||
|
return "insufficient_data"
|
||||||
|
|
||||||
|
years = sorted(scores_by_year.index)
|
||||||
|
vals = [scores_by_year[y] for y in years if not pd.isna(scores_by_year.get(y, np.nan))]
|
||||||
|
|
||||||
|
if len(vals) < 3:
|
||||||
|
return "insufficient_data"
|
||||||
|
|
||||||
|
# Hitung slope keseluruhan
|
||||||
|
x = np.arange(len(vals))
|
||||||
|
slope = np.polyfit(x, vals, 1)[0]
|
||||||
|
|
||||||
|
# Slope positif = skor naik = baik untuk higher_better, buruk untuk lower_better
|
||||||
|
improving = (slope > 0 and not lower_better) or (slope < 0 and lower_better)
|
||||||
|
|
||||||
|
# Hitung apakah laju melambat: bandingkan slope paruh pertama vs paruh kedua
|
||||||
|
mid = len(vals) // 2
|
||||||
|
first_half = vals[:mid]
|
||||||
|
second_half = vals[mid:]
|
||||||
|
slope1 = np.polyfit(np.arange(len(first_half)), first_half, 1)[0] if len(first_half) > 1 else 0
|
||||||
|
slope2 = np.polyfit(np.arange(len(second_half)), second_half, 1)[0] if len(second_half) > 1 else 0
|
||||||
|
|
||||||
|
# Koefisien variasi untuk cek fluktuasi
|
||||||
|
cv = np.std(vals) / (np.mean(vals) + 1e-9)
|
||||||
|
|
||||||
|
if cv > 0.25:
|
||||||
|
return "fluctuating"
|
||||||
|
|
||||||
|
if improving:
|
||||||
|
# Cek apakah melambat
|
||||||
|
if lower_better:
|
||||||
|
slowing = slope2 > slope1 # slope negatif mengecil artinya melambat
|
||||||
|
else:
|
||||||
|
slowing = slope2 < slope1 # slope positif mengecil artinya melambat
|
||||||
|
return "improving_slowing" if slowing else "improving_consistent"
|
||||||
|
else:
|
||||||
|
return "deteriorating"
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_gap_trend(df_ind: pd.DataFrame, lower_better: bool) -> str:
|
||||||
|
"""
|
||||||
|
Deteksi apakah gap antar negara melebar, menyempit, atau stabil.
|
||||||
|
df_ind: rows untuk 1 indikator, kolom: year, country_id, value
|
||||||
|
"""
|
||||||
|
std_by_year = (
|
||||||
|
df_ind.groupby("year")["value"]
|
||||||
|
.std()
|
||||||
|
.dropna()
|
||||||
|
)
|
||||||
|
if len(std_by_year) < 3:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
years = sorted(std_by_year.index)
|
||||||
|
stds = [std_by_year[y] for y in years]
|
||||||
|
slope = np.polyfit(np.arange(len(stds)), stds, 1)[0]
|
||||||
|
|
||||||
|
if abs(slope) < 0.01 * np.mean(stds):
|
||||||
|
return "stable"
|
||||||
|
return "widening" if slope > 0 else "narrowing"
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_anomaly_year(scores_by_year: pd.Series) -> tuple:
|
||||||
|
"""
|
||||||
|
Deteksi tahun dengan perubahan paling ekstrem (naik atau turun tajam).
|
||||||
|
Return: (anomaly_year, direction) atau (None, None)
|
||||||
|
"""
|
||||||
|
if len(scores_by_year) < 3:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
years = sorted(scores_by_year.index)
|
||||||
|
deltas = {}
|
||||||
|
for i in range(1, len(years)):
|
||||||
|
y_prev = years[i - 1]
|
||||||
|
y_curr = years[i]
|
||||||
|
v_prev = scores_by_year.get(y_prev, np.nan)
|
||||||
|
v_curr = scores_by_year.get(y_curr, np.nan)
|
||||||
|
if not pd.isna(v_prev) and not pd.isna(v_curr):
|
||||||
|
deltas[y_curr] = v_curr - v_prev
|
||||||
|
|
||||||
|
if not deltas:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
max_drop_year = min(deltas, key=deltas.get)
|
||||||
|
max_rise_year = max(deltas, key=deltas.get)
|
||||||
|
|
||||||
|
threshold = 1.5 * np.std(list(deltas.values()))
|
||||||
|
if abs(deltas[max_drop_year]) > threshold and deltas[max_drop_year] < 0:
|
||||||
|
return max_drop_year, "drop"
|
||||||
|
if abs(deltas[max_rise_year]) > threshold and deltas[max_rise_year] > 0:
|
||||||
|
return max_rise_year, "rise"
|
||||||
|
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_consistency(df_ind: pd.DataFrame, lower_better: bool) -> tuple:
|
||||||
|
"""
|
||||||
|
Cari negara yang paling konsisten terbaik dan terburuk.
|
||||||
|
Return: (consistent_best, consistent_worst, is_consistent)
|
||||||
|
"""
|
||||||
|
country_avg = (
|
||||||
|
df_ind.groupby("country_name")["value"]
|
||||||
|
.mean()
|
||||||
|
.dropna()
|
||||||
|
)
|
||||||
|
if country_avg.empty:
|
||||||
|
return None, None, False
|
||||||
|
|
||||||
|
if lower_better:
|
||||||
|
best = country_avg.idxmin()
|
||||||
|
worst = country_avg.idxmax()
|
||||||
|
else:
|
||||||
|
best = country_avg.idxmax()
|
||||||
|
worst = country_avg.idxmin()
|
||||||
|
|
||||||
|
# Cek konsistensi: apakah negara terbaik selalu di atas rata-rata?
|
||||||
|
asean_avg_by_year = df_ind.groupby("year")["value"].mean()
|
||||||
|
country_by_year = df_ind[df_ind["country_name"] == best].set_index("year")["value"]
|
||||||
|
|
||||||
|
years_both = set(asean_avg_by_year.index) & set(country_by_year.index)
|
||||||
|
if not years_both:
|
||||||
|
return best, worst, False
|
||||||
|
|
||||||
|
if lower_better:
|
||||||
|
consistent = all(
|
||||||
|
country_by_year[y] <= asean_avg_by_year[y]
|
||||||
|
for y in years_both
|
||||||
|
if not pd.isna(country_by_year.get(y, np.nan))
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
consistent = all(
|
||||||
|
country_by_year[y] >= asean_avg_by_year[y]
|
||||||
|
for y in years_both
|
||||||
|
if not pd.isna(country_by_year.get(y, np.nan))
|
||||||
|
)
|
||||||
|
|
||||||
|
return best, worst, consistent
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# NARRATIVE BUILDER — plain text, no markdown, bilingual
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tuple:
|
||||||
|
"""
|
||||||
|
Bangun narasi interpretatif per indikator berdasarkan kondisi nyata data.
|
||||||
|
Return: (narrative_en, narrative_id) — plain text tanpa markdown bold.
|
||||||
|
"""
|
||||||
|
ind_id = int(row["indicator_id"])
|
||||||
ind_name = str(row["indicator_name"]).strip()
|
ind_name = str(row["indicator_name"]).strip()
|
||||||
unit = str(row["unit"]).strip() if row["unit"] else ""
|
unit = str(row["unit"]).strip() if row["unit"] else ""
|
||||||
direction = str(row["direction"]).strip()
|
direction = str(row["direction"]).strip()
|
||||||
@@ -218,85 +350,109 @@ def _build_narrative_per_indicator(row: pd.Series) -> str:
|
|||||||
framework = str(row["framework"]).strip()
|
framework = str(row["framework"]).strip()
|
||||||
year_min = int(row["year_min"])
|
year_min = int(row["year_min"])
|
||||||
year_max = int(row["year_max"])
|
year_max = int(row["year_max"])
|
||||||
n_countries = int(row["n_countries"])
|
|
||||||
avg_score = row["avg_norm_score_1_100"]
|
|
||||||
performance = row["performance"]
|
|
||||||
|
|
||||||
avg_first = row["avg_value_first"]
|
|
||||||
avg_last = row["avg_value_last"]
|
|
||||||
|
|
||||||
n_yoy_total = int(row["n_yoy_total"]) if not pd.isna(row["n_yoy_total"]) else 0
|
|
||||||
n_yoy_positive = int(row["n_yoy_positive"]) if not pd.isna(row["n_yoy_positive"]) else 0
|
|
||||||
|
|
||||||
best_yoy_from = row["best_yoy_from"]
|
|
||||||
best_yoy_to = row["best_yoy_to"]
|
|
||||||
|
|
||||||
country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None
|
|
||||||
country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None
|
|
||||||
|
|
||||||
lower_better = _is_lower_better(direction)
|
lower_better = _is_lower_better(direction)
|
||||||
|
|
||||||
def _fmt(v):
|
# Subset data untuk indikator ini
|
||||||
|
df_ind = df_full[df_full["indicator_id"] == ind_id].copy()
|
||||||
|
|
||||||
|
if df_ind.empty:
|
||||||
|
na_en = f"{ind_name} ({framework}, {pillar}): Insufficient data for analysis."
|
||||||
|
na_id = f"{ind_name} ({framework}, {pillar}): Data tidak cukup untuk dianalisis."
|
||||||
|
return na_en, na_id
|
||||||
|
|
||||||
|
# ---- Hitung kondisi dari data ----
|
||||||
|
asean_avg_by_year = (
|
||||||
|
df_ind.groupby("year")["value"].mean().dropna()
|
||||||
|
)
|
||||||
|
|
||||||
|
trend_label = _detect_trend(asean_avg_by_year, lower_better)
|
||||||
|
gap_label = _detect_gap_trend(df_ind, lower_better)
|
||||||
|
anomaly_year, anomaly_dir = _detect_anomaly_year(asean_avg_by_year)
|
||||||
|
best_country, worst_country, is_consistent = _detect_consistency(df_ind, lower_better)
|
||||||
|
|
||||||
|
avg_first = row.get("avg_value_first", np.nan)
|
||||||
|
avg_last = row.get("avg_value_last", np.nan)
|
||||||
|
|
||||||
|
def fmt(v):
|
||||||
if pd.isna(v):
|
if pd.isna(v):
|
||||||
return "N/A"
|
return "N/A"
|
||||||
abs_v = abs(v)
|
abs_v = abs(v)
|
||||||
if abs_v >= 1000:
|
s = f"{v:,.1f}" if abs_v >= 1000 else (f"{v:.2f}" if abs_v >= 10 else f"{v:.3f}")
|
||||||
s = f"{v:,.1f}"
|
|
||||||
elif abs_v >= 10:
|
|
||||||
s = f"{v:.2f}"
|
|
||||||
else:
|
|
||||||
s = f"{v:.3f}"
|
|
||||||
return f"{s} {unit}".strip() if unit else s
|
return f"{s} {unit}".strip() if unit else s
|
||||||
|
|
||||||
# Sentence 1: trend first -> last
|
# ---- Bangun kalimat EN ----
|
||||||
if not pd.isna(avg_first) and not pd.isna(avg_last):
|
sentences_en = []
|
||||||
diff = avg_last - avg_first
|
sentences_id = []
|
||||||
is_improving = (diff < 0) if lower_better else (diff > 0)
|
|
||||||
trend_label = "improving" if is_improving else "deteriorating"
|
# Kalimat 1: konteks indikator
|
||||||
verb = "fell" if diff < 0 else "rose"
|
s1_en = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):"
|
||||||
sent1 = (
|
s1_id = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):"
|
||||||
f"**{ind_name}** ({framework}, {pillar}): ASEAN average {verb} from "
|
sentences_en.append(s1_en)
|
||||||
f"**{_fmt(avg_first)}** to **{_fmt(avg_last)}** ({year_min} to {year_max}), "
|
sentences_id.append(s1_id)
|
||||||
f"**{trend_label}** trend."
|
|
||||||
|
# Kalimat 2: tren keseluruhan
|
||||||
|
trend_map_en = {
|
||||||
|
"improving_consistent": f"Regional average improved consistently from {fmt(avg_first)} to {fmt(avg_last)}.",
|
||||||
|
"improving_slowing": f"Regional average improved from {fmt(avg_first)} to {fmt(avg_last)}, though the pace slowed in recent years.",
|
||||||
|
"deteriorating": f"Regional average worsened from {fmt(avg_first)} to {fmt(avg_last)} over the period.",
|
||||||
|
"fluctuating": f"Regional average fluctuated between {fmt(avg_first)} and {fmt(avg_last)} with no clear trend.",
|
||||||
|
"insufficient_data": f"Trend analysis is limited due to sparse data.",
|
||||||
|
}
|
||||||
|
trend_map_id = {
|
||||||
|
"improving_consistent": f"Rata-rata regional membaik secara konsisten dari {fmt(avg_first)} menjadi {fmt(avg_last)}.",
|
||||||
|
"improving_slowing": f"Rata-rata regional membaik dari {fmt(avg_first)} menjadi {fmt(avg_last)}, namun lajunya melambat dalam beberapa tahun terakhir.",
|
||||||
|
"deteriorating": f"Rata-rata regional memburuk dari {fmt(avg_first)} menjadi {fmt(avg_last)} sepanjang periode.",
|
||||||
|
"fluctuating": f"Rata-rata regional berfluktuasi antara {fmt(avg_first)} dan {fmt(avg_last)} tanpa tren yang jelas.",
|
||||||
|
"insufficient_data": f"Analisis tren terbatas karena data yang tersedia tidak cukup.",
|
||||||
|
}
|
||||||
|
sentences_en.append(trend_map_en.get(trend_label, ""))
|
||||||
|
sentences_id.append(trend_map_id.get(trend_label, ""))
|
||||||
|
|
||||||
|
# Kalimat 3: gap antar negara
|
||||||
|
if gap_label == "widening":
|
||||||
|
sentences_en.append("Disparity among ASEAN countries has widened over time, indicating unequal progress.")
|
||||||
|
sentences_id.append("Kesenjangan antar negara ASEAN melebar seiring waktu, menunjukkan kemajuan yang tidak merata.")
|
||||||
|
elif gap_label == "narrowing":
|
||||||
|
sentences_en.append("Disparity among ASEAN countries has narrowed, suggesting more balanced regional progress.")
|
||||||
|
sentences_id.append("Kesenjangan antar negara ASEAN menyempit, mengindikasikan kemajuan regional yang lebih merata.")
|
||||||
|
elif gap_label == "stable":
|
||||||
|
sentences_en.append("The gap among ASEAN countries remained relatively stable throughout the period.")
|
||||||
|
sentences_id.append("Kesenjangan antar negara ASEAN relatif stabil sepanjang periode.")
|
||||||
|
|
||||||
|
# Kalimat 4: anomali
|
||||||
|
if anomaly_year is not None:
|
||||||
|
if anomaly_dir == "drop":
|
||||||
|
sentences_en.append(f"A notable decline was recorded in {anomaly_year}, which stood out from the overall pattern.")
|
||||||
|
sentences_id.append(f"Penurunan signifikan tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.")
|
||||||
|
elif anomaly_dir == "rise":
|
||||||
|
sentences_en.append(f"A sharp improvement was observed in {anomaly_year}, standing out from the overall pattern.")
|
||||||
|
sentences_id.append(f"Peningkatan tajam tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.")
|
||||||
|
|
||||||
|
# Kalimat 5: konsistensi negara terbaik/terburuk
|
||||||
|
if best_country and worst_country:
|
||||||
|
if is_consistent:
|
||||||
|
sentences_en.append(
|
||||||
|
f"{best_country} consistently performed above the regional average, "
|
||||||
|
f"while {worst_country} consistently lagged behind."
|
||||||
|
)
|
||||||
|
sentences_id.append(
|
||||||
|
f"{best_country} secara konsisten berada di atas rata-rata regional, "
|
||||||
|
f"sementara {worst_country} secara konsisten tertinggal."
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
sent1 = (
|
sentences_en.append(
|
||||||
f"**{ind_name}** ({framework}, {pillar}): trend data unavailable "
|
f"Overall, {best_country} showed the best performance, "
|
||||||
f"({year_min} to {year_max}, {n_countries} members)."
|
f"while {worst_country} had the weakest results across the period."
|
||||||
|
)
|
||||||
|
sentences_id.append(
|
||||||
|
f"Secara keseluruhan, {best_country} menunjukkan performa terbaik, "
|
||||||
|
f"sementara {worst_country} memiliki hasil terlemah sepanjang periode."
|
||||||
)
|
)
|
||||||
|
|
||||||
# Sentence 2: score + performance
|
narrative_en = " ".join(s for s in sentences_en if s)
|
||||||
if not pd.isna(avg_score):
|
narrative_id = " ".join(s for s in sentences_id if s)
|
||||||
perf_label = f"*{performance}*" if performance in ("Good", "Bad") else ""
|
|
||||||
sent2 = f"Score: **{avg_score:.1f}/100** {perf_label}.".strip()
|
|
||||||
else:
|
|
||||||
sent2 = "Score unavailable."
|
|
||||||
|
|
||||||
# Sentence 3: best / worst country
|
return narrative_en, narrative_id
|
||||||
if country_best and country_worst and country_best != country_worst:
|
|
||||||
sent3 = f"Best country: **{country_best}**; worst: **{country_worst}**."
|
|
||||||
elif country_best:
|
|
||||||
sent3 = f"Best country: **{country_best}**."
|
|
||||||
else:
|
|
||||||
sent3 = ""
|
|
||||||
|
|
||||||
# Sentence 4: YoY transitions
|
|
||||||
if n_yoy_total > 0:
|
|
||||||
best_period = ""
|
|
||||||
if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to):
|
|
||||||
best_period = f", best gain: **{int(best_yoy_from)} to {int(best_yoy_to)}**"
|
|
||||||
sent4 = (
|
|
||||||
f"Improved in **{n_yoy_positive}/{n_yoy_total}** YoY transitions{best_period}."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sent4 = "Insufficient data for YoY assessment."
|
|
||||||
|
|
||||||
parts = [sent1, sent2]
|
|
||||||
if sent3:
|
|
||||||
parts.append(sent3)
|
|
||||||
parts.append(sent4)
|
|
||||||
|
|
||||||
return " ".join(parts)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
@@ -308,27 +464,6 @@ class IndicatorNormAggregator:
|
|||||||
Hitung norm_value per indikator untuk seluruh data di
|
Hitung norm_value per indikator untuk seluruh data di
|
||||||
fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm.
|
fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm.
|
||||||
Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator.
|
Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator.
|
||||||
|
|
||||||
Alur agg_indicator_norm:
|
|
||||||
1. Load fact_asean_food_security_selected
|
|
||||||
2. Load dim_indicator -> ambil kolom unit
|
|
||||||
3. Merge unit ke df
|
|
||||||
4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data)
|
|
||||||
5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label
|
|
||||||
6. Hitung norm_value per indikator (direction-aware, 0-1)
|
|
||||||
7. Hitung YoY per (indicator_id, country_id)
|
|
||||||
8. Scale ke 1-100 per indikator (global)
|
|
||||||
9. Assign performance label (Good/Bad)
|
|
||||||
10. Simpan ke BigQuery -> agg_indicator_norm
|
|
||||||
11. Summary log agg_indicator_norm
|
|
||||||
|
|
||||||
Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada):
|
|
||||||
12. Agregasi ke level ASEAN per indicator_id
|
|
||||||
13. Hitung YoY avg_value per indikator
|
|
||||||
14. Assign performance berdasarkan avg_norm_score
|
|
||||||
15. Build narrative 1 paragraf per indikator
|
|
||||||
16. Simpan ke BigQuery -> agg_narrative_indicator
|
|
||||||
17. Summary log agg_narrative_indicator
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, client: bigquery.Client):
|
def __init__(self, client: bigquery.Client):
|
||||||
@@ -445,11 +580,6 @@ class IndicatorNormAggregator:
|
|||||||
self.logger.info(
|
self.logger.info(
|
||||||
f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}"
|
f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}"
|
||||||
)
|
)
|
||||||
unique_units = self.df["unit"].value_counts().to_dict()
|
|
||||||
self.logger.info(" Distribusi unit (top 10):")
|
|
||||||
for u, cnt in list(unique_units.items())[:10]:
|
|
||||||
label = repr(u) if u == "" else u
|
|
||||||
self.logger.info(f" {label:<30}: {cnt:,} rows")
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 4: Deteksi sdgs_start_year
|
# STEP 4: Deteksi sdgs_start_year
|
||||||
@@ -465,12 +595,7 @@ class IndicatorNormAggregator:
|
|||||||
]
|
]
|
||||||
if not fies_rows.empty:
|
if not fies_rows.empty:
|
||||||
sdgs_start = int(fies_rows["year"].min())
|
sdgs_start = int(fies_rows["year"].min())
|
||||||
n_fies_ind = fies_rows["indicator_name"].nunique()
|
|
||||||
self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}")
|
self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}")
|
||||||
self.logger.info(f" FIES indicators found: {n_fies_ind}, first year = {sdgs_start}")
|
|
||||||
for nm in fies_rows["indicator_name"].unique():
|
|
||||||
min_y = int(fies_rows[fies_rows["indicator_name"] == nm]["year"].min())
|
|
||||||
self.logger.info(f" - {nm[:60]} (first year: {min_y})")
|
|
||||||
return sdgs_start
|
return sdgs_start
|
||||||
|
|
||||||
self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar")
|
self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar")
|
||||||
@@ -480,7 +605,6 @@ class IndicatorNormAggregator:
|
|||||||
.rename(columns={"year": "min_year"})
|
.rename(columns={"year": "min_year"})
|
||||||
)
|
)
|
||||||
unique_years = sorted(ind_min_year["min_year"].unique())
|
unique_years = sorted(ind_min_year["min_year"].unique())
|
||||||
self.logger.info(f" Unique min_year per indikator: {unique_years}")
|
|
||||||
|
|
||||||
if len(unique_years) == 1:
|
if len(unique_years) == 1:
|
||||||
sdgs_start = int(unique_years[0]) + 9999
|
sdgs_start = int(unique_years[0]) + 9999
|
||||||
@@ -510,13 +634,11 @@ class IndicatorNormAggregator:
|
|||||||
self.logger.info("=" * 80)
|
self.logger.info("=" * 80)
|
||||||
|
|
||||||
df = self.df.copy()
|
df = self.df.copy()
|
||||||
|
|
||||||
df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER)
|
df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER)
|
||||||
df["framework"] = "MDGs"
|
df["framework"] = "MDGs"
|
||||||
|
|
||||||
mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year)
|
mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year)
|
||||||
df.loc[mask_sdgs, "framework"] = "SDGs"
|
df.loc[mask_sdgs, "framework"] = "SDGs"
|
||||||
|
|
||||||
df = df.drop(columns=["_is_sdg_kw"])
|
df = df.drop(columns=["_is_sdg_kw"])
|
||||||
|
|
||||||
fw_dist = df["framework"].value_counts()
|
fw_dist = df["framework"].value_counts()
|
||||||
@@ -524,32 +646,10 @@ class IndicatorNormAggregator:
|
|||||||
for fw, cnt in fw_dist.items():
|
for fw, cnt in fw_dist.items():
|
||||||
self.logger.info(f" {fw:<6}: {cnt:,} rows")
|
self.logger.info(f" {fw:<6}: {cnt:,} rows")
|
||||||
|
|
||||||
dual = (
|
|
||||||
df.groupby("indicator_id")["framework"]
|
|
||||||
.nunique()
|
|
||||||
.reset_index()
|
|
||||||
.rename(columns={"framework": "n_frameworks"})
|
|
||||||
)
|
|
||||||
dual_ids = dual[dual["n_frameworks"] > 1]["indicator_id"].tolist()
|
|
||||||
self.logger.info(
|
|
||||||
f"\n Indikator dengan DUAL framework (MDGs + SDGs): {len(dual_ids)}"
|
|
||||||
)
|
|
||||||
if dual_ids:
|
|
||||||
for iid in dual_ids:
|
|
||||||
ind_name = df[df["indicator_id"] == iid]["indicator_name"].iloc[0]
|
|
||||||
yr_range = df[df["indicator_id"] == iid][["year", "framework"]].drop_duplicates()
|
|
||||||
mdgs_yrs = sorted(yr_range[yr_range["framework"] == "MDGs"]["year"].tolist())
|
|
||||||
sdgs_yrs = sorted(yr_range[yr_range["framework"] == "SDGs"]["year"].tolist())
|
|
||||||
self.logger.info(
|
|
||||||
f" [{iid}] {ind_name[:55]}\n"
|
|
||||||
f" MDGs years: {mdgs_yrs}\n"
|
|
||||||
f" SDGs years: {sdgs_yrs}"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.df = df
|
self.df = df
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 6: Hitung norm_value per indikator (direction-aware)
|
# STEP 6: Hitung norm_value per indikator
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _compute_norm_values(self) -> pd.DataFrame:
|
def _compute_norm_values(self) -> pd.DataFrame:
|
||||||
@@ -595,17 +695,11 @@ class IndicatorNormAggregator:
|
|||||||
norm_parts.append(grp)
|
norm_parts.append(grp)
|
||||||
|
|
||||||
df_normed = pd.concat(norm_parts, ignore_index=True)
|
df_normed = pd.concat(norm_parts, ignore_index=True)
|
||||||
|
|
||||||
self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators")
|
self.logger.info(f" norm_value computed: {df_normed['indicator_id'].nunique()} indicators")
|
||||||
self.logger.info(
|
|
||||||
f" norm_value range : "
|
|
||||||
f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}"
|
|
||||||
)
|
|
||||||
self.logger.info(f" norm_value nulls : {df_normed['norm_value'].isna().sum()}")
|
|
||||||
return df_normed
|
return df_normed
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 7: Hitung YoY per (indicator_id, country_id)
|
# STEP 7: Hitung YoY
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame:
|
def _compute_yoy_columns(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
@@ -621,21 +715,8 @@ class IndicatorNormAggregator:
|
|||||||
parts.append(_compute_yoy(grp))
|
parts.append(_compute_yoy(grp))
|
||||||
|
|
||||||
df_out = pd.concat(parts, ignore_index=True)
|
df_out = pd.concat(parts, ignore_index=True)
|
||||||
|
self.logger.info(f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}")
|
||||||
self.logger.info(
|
self.logger.info(f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}")
|
||||||
f" yoy_value nulls : {df_out['yoy_value'].isna().sum():,}"
|
|
||||||
)
|
|
||||||
self.logger.info(
|
|
||||||
f" yoy_value range : "
|
|
||||||
f"{df_out['yoy_value'].min():.4f} - {df_out['yoy_value'].max():.4f}"
|
|
||||||
)
|
|
||||||
self.logger.info(
|
|
||||||
f" yoy_norm_value nulls: {df_out['yoy_norm_value'].isna().sum():,}"
|
|
||||||
)
|
|
||||||
self.logger.info(
|
|
||||||
f" yoy_norm_value range: "
|
|
||||||
f"{df_out['yoy_norm_value'].min():.4f} - {df_out['yoy_norm_value'].max():.4f}"
|
|
||||||
)
|
|
||||||
return df_out
|
return df_out
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
@@ -690,7 +771,7 @@ class IndicatorNormAggregator:
|
|||||||
return df
|
return df
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 10: Save agg_indicator_norm to BigQuery
|
# STEP 10: Save agg_indicator_norm
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _save(self, df: pd.DataFrame) -> int:
|
def _save(self, df: pd.DataFrame) -> int:
|
||||||
@@ -701,22 +782,11 @@ class IndicatorNormAggregator:
|
|||||||
self.logger.info("=" * 80)
|
self.logger.info("=" * 80)
|
||||||
|
|
||||||
out = df[[
|
out = df[[
|
||||||
"year",
|
"year", "country_id", "country_name",
|
||||||
"country_id",
|
"indicator_id", "indicator_name", "unit", "direction",
|
||||||
"country_name",
|
"pillar_id", "pillar_name", "framework",
|
||||||
"indicator_id",
|
"value", "norm_value", "norm_score_1_100",
|
||||||
"indicator_name",
|
"yoy_value", "yoy_norm_value", "performance",
|
||||||
"unit",
|
|
||||||
"direction",
|
|
||||||
"pillar_id",
|
|
||||||
"pillar_name",
|
|
||||||
"framework",
|
|
||||||
"value",
|
|
||||||
"norm_value",
|
|
||||||
"norm_score_1_100",
|
|
||||||
"yoy_value",
|
|
||||||
"yoy_norm_value",
|
|
||||||
"performance",
|
|
||||||
]].copy()
|
]].copy()
|
||||||
|
|
||||||
out = out.sort_values(
|
out = out.sort_values(
|
||||||
@@ -740,13 +810,10 @@ class IndicatorNormAggregator:
|
|||||||
out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float)
|
out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float)
|
||||||
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
|
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
|
||||||
|
|
||||||
self.logger.info(f" Columns : {list(out.columns)}")
|
|
||||||
self.logger.info(f" Total rows : {len(out):,}")
|
self.logger.info(f" Total rows : {len(out):,}")
|
||||||
self.logger.info(f" Countries : {out['country_id'].nunique()}")
|
self.logger.info(f" Countries : {out['country_id'].nunique()}")
|
||||||
self.logger.info(f" Indicators : {out['indicator_id'].nunique()}")
|
self.logger.info(f" Indicators : {out['indicator_id'].nunique()}")
|
||||||
self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}")
|
self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}")
|
||||||
self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}")
|
|
||||||
self.logger.info(f" Performance: {dict(out['performance'].value_counts())}")
|
|
||||||
|
|
||||||
schema = [
|
schema = [
|
||||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||||
@@ -793,11 +860,6 @@ class IndicatorNormAggregator:
|
|||||||
"yoy_columns" : ["yoy_value", "yoy_norm_value"],
|
"yoy_columns" : ["yoy_value", "yoy_norm_value"],
|
||||||
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
||||||
"unit_source" : "dim_indicator",
|
"unit_source" : "dim_indicator",
|
||||||
"framework_logic" : (
|
|
||||||
"SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, "
|
|
||||||
"SDGs if year >= sdgs_start_year. "
|
|
||||||
"Non-SDG_ONLY: always MDGs."
|
|
||||||
),
|
|
||||||
}),
|
}),
|
||||||
"validation_metrics" : json.dumps({
|
"validation_metrics" : json.dumps({
|
||||||
"total_rows" : rows_loaded,
|
"total_rows" : rows_loaded,
|
||||||
@@ -807,11 +869,10 @@ class IndicatorNormAggregator:
|
|||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
save_etl_metadata(self.client, metadata)
|
save_etl_metadata(self.client, metadata)
|
||||||
self.logger.info(" Metadata -> [AUDIT] etl_metadata")
|
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 11: Summary log agg_indicator_norm
|
# STEP 11: Summary log
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _log_summary(self, df: pd.DataFrame):
|
def _log_summary(self, df: pd.DataFrame):
|
||||||
@@ -819,140 +880,81 @@ class IndicatorNormAggregator:
|
|||||||
self.logger.info("STEP 11: SUMMARY — agg_indicator_norm")
|
self.logger.info("STEP 11: SUMMARY — agg_indicator_norm")
|
||||||
self.logger.info("=" * 80)
|
self.logger.info("=" * 80)
|
||||||
|
|
||||||
summary = (
|
|
||||||
df.groupby(["framework", "year"])
|
|
||||||
.agg(
|
|
||||||
n_indicators=("indicator_id", "nunique"),
|
|
||||||
n_countries =("country_id", "nunique"),
|
|
||||||
avg_score =("norm_score_1_100", "mean"),
|
|
||||||
)
|
|
||||||
.reset_index()
|
|
||||||
)
|
|
||||||
self.logger.info(
|
|
||||||
f"\n{'Framework':<8} {'Year':<6} {'Indicators':<12} {'Countries':<12} {'Avg Score'}"
|
|
||||||
)
|
|
||||||
self.logger.info("-" * 55)
|
|
||||||
for _, r in summary.iterrows():
|
|
||||||
self.logger.info(
|
|
||||||
f"{r['framework']:<8} {int(r['year']):<6} "
|
|
||||||
f"{int(r['n_indicators']):<12} {int(r['n_countries']):<12} "
|
|
||||||
f"{r['avg_score']:.2f}"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.logger.info("\n Performance summary per Framework:")
|
|
||||||
perf_fw = (
|
|
||||||
df[df["performance"].notna()]
|
|
||||||
.groupby(["framework", "performance"])
|
|
||||||
.size()
|
|
||||||
.reset_index(name="count")
|
|
||||||
)
|
|
||||||
for fw in perf_fw["framework"].unique():
|
|
||||||
sub = perf_fw[perf_fw["framework"] == fw]
|
|
||||||
total = sub["count"].sum()
|
|
||||||
self.logger.info(f" [{fw}]")
|
|
||||||
for _, r in sub.iterrows():
|
|
||||||
self.logger.info(
|
|
||||||
f" {r['performance']:<6}: {int(r['count']):,} "
|
|
||||||
f"({r['count']/total*100:.1f}%)"
|
|
||||||
)
|
|
||||||
|
|
||||||
ind_avg = (
|
ind_avg = (
|
||||||
df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"])
|
df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"])
|
||||||
["norm_score_1_100"].mean()
|
["norm_score_1_100"].mean()
|
||||||
.reset_index()
|
.reset_index()
|
||||||
.sort_values("norm_score_1_100", ascending=False)
|
.sort_values("norm_score_1_100", ascending=False)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info("\n TOP 5 Indicators (avg norm_score_1_100):")
|
||||||
"\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):"
|
|
||||||
)
|
|
||||||
for _, r in ind_avg.head(5).iterrows():
|
for _, r in ind_avg.head(5).iterrows():
|
||||||
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
||||||
unit = f"[{r['unit']}]" if r["unit"] else ""
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
|
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
|
||||||
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
|
f"{r['norm_score_1_100']:.2f} {tag}"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info("\n BOTTOM 5 Indicators:")
|
self.logger.info("\n BOTTOM 5 Indicators:")
|
||||||
for _, r in ind_avg.tail(5).iterrows():
|
for _, r in ind_avg.tail(5).iterrows():
|
||||||
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
||||||
unit = f"[{r['unit']}]" if r["unit"] else ""
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
|
f" [{int(r['indicator_id'])}] {r['indicator_name'][:50]:<52} "
|
||||||
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
|
f"{r['norm_score_1_100']:.2f} {tag}"
|
||||||
)
|
)
|
||||||
|
|
||||||
pillar_summary = (
|
|
||||||
df.drop_duplicates(subset=["indicator_id", "pillar_name"])
|
|
||||||
.groupby("pillar_name")["indicator_id"]
|
|
||||||
.count()
|
|
||||||
.reset_index()
|
|
||||||
.rename(columns={"indicator_id": "n_indicators"})
|
|
||||||
)
|
|
||||||
self.logger.info("\n Indicators per pillar:")
|
|
||||||
for _, r in pillar_summary.iterrows():
|
|
||||||
self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}")
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 12-16: agg_narrative_indicator
|
# STEP 12-17: agg_narrative_indicator
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _build_narrative_table(self, df_final: pd.DataFrame):
|
def _build_narrative_table(self, df_final: pd.DataFrame):
|
||||||
"""
|
|
||||||
Pipeline agg_narrative_indicator.
|
|
||||||
Granularity: per indicator_id (1 baris per indikator, all years, all countries).
|
|
||||||
"""
|
|
||||||
self.logger.info("\n" + "=" * 80)
|
self.logger.info("\n" + "=" * 80)
|
||||||
self.logger.info("STEP 12-16: agg_narrative_indicator")
|
self.logger.info("STEP 12-17: agg_narrative_indicator")
|
||||||
self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)")
|
self.logger.info(" Granularity: per indicator_id (all years + all ASEAN countries)")
|
||||||
|
self.logger.info(" Narrative : interpretatif, plain text, bilingual EN/ID")
|
||||||
self.logger.info("=" * 80)
|
self.logger.info("=" * 80)
|
||||||
|
|
||||||
df = df_final.copy()
|
df = df_final.copy()
|
||||||
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
|
|
||||||
|
|
||||||
# ---- 12a. ASEAN avg per (indicator_id, year) -------------------------
|
# ---- Agregasi statistik per indikator ----
|
||||||
self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---")
|
|
||||||
df_yr = (
|
df_yr = (
|
||||||
df.groupby(["indicator_id", "year"])
|
df.groupby(["indicator_id", "year"])
|
||||||
.agg(
|
.agg(
|
||||||
avg_value =("value", "mean"),
|
avg_value =("value", "mean"),
|
||||||
avg_norm_score =("norm_score_1_100", "mean"),
|
avg_norm_score =("norm_score_1_100", "mean"),
|
||||||
n_countries_year =("country_id", "nunique"),
|
n_countries_yr =("country_id", "nunique"),
|
||||||
)
|
)
|
||||||
.reset_index()
|
.reset_index()
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12b. first / last avg value per indikator -----------------------
|
|
||||||
df_first = (
|
df_first = (
|
||||||
df_yr.sort_values("year")
|
df_yr.sort_values("year").groupby("indicator_id").first().reset_index()
|
||||||
.groupby("indicator_id").first().reset_index()
|
|
||||||
[["indicator_id", "year", "avg_value"]]
|
[["indicator_id", "year", "avg_value"]]
|
||||||
.rename(columns={"year": "year_min", "avg_value": "avg_value_first"})
|
.rename(columns={"year": "year_min", "avg_value": "avg_value_first"})
|
||||||
)
|
)
|
||||||
df_last = (
|
df_last = (
|
||||||
df_yr.sort_values("year")
|
df_yr.sort_values("year").groupby("indicator_id").last().reset_index()
|
||||||
.groupby("indicator_id").last().reset_index()
|
|
||||||
[["indicator_id", "year", "avg_value"]]
|
[["indicator_id", "year", "avg_value"]]
|
||||||
.rename(columns={"year": "year_max", "avg_value": "avg_value_last"})
|
.rename(columns={"year": "year_max", "avg_value": "avg_value_last"})
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12c. Rata-rata norm_score seluruh periode -----------------------
|
|
||||||
df_score_avg = (
|
df_score_avg = (
|
||||||
df_yr.groupby("indicator_id")
|
df_yr.groupby("indicator_id")
|
||||||
.agg(avg_norm_score_1_100=("avg_norm_score", "mean"))
|
.agg(avg_norm_score_1_100=("avg_norm_score", "mean"))
|
||||||
.reset_index()
|
.reset_index()
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12d. n_countries ------------------------------------------------
|
|
||||||
df_nc = (
|
df_nc = (
|
||||||
df.groupby("indicator_id")["country_id"]
|
df.groupby("indicator_id")["country_id"]
|
||||||
.nunique().reset_index()
|
.nunique().reset_index()
|
||||||
.rename(columns={"country_id": "n_countries"})
|
.rename(columns={"country_id": "n_countries"})
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12e. YoY per indicator (ASEAN avg) ------------------------------
|
# YoY stats
|
||||||
self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---")
|
dir_map = (
|
||||||
|
df[["indicator_id", "direction"]]
|
||||||
|
.drop_duplicates(subset=["indicator_id"])
|
||||||
|
.set_index("indicator_id")["direction"]
|
||||||
|
.to_dict()
|
||||||
|
)
|
||||||
|
|
||||||
yoy_parts = []
|
yoy_parts = []
|
||||||
for ind_id, grp in df_yr.groupby("indicator_id"):
|
for ind_id, grp in df_yr.groupby("indicator_id"):
|
||||||
@@ -967,13 +969,6 @@ class IndicatorNormAggregator:
|
|||||||
yoy_parts.append(grp)
|
yoy_parts.append(grp)
|
||||||
df_yr = pd.concat(yoy_parts, ignore_index=True)
|
df_yr = pd.concat(yoy_parts, ignore_index=True)
|
||||||
|
|
||||||
dir_map = (
|
|
||||||
df[["indicator_id", "direction"]]
|
|
||||||
.drop_duplicates(subset=["indicator_id"])
|
|
||||||
.set_index("indicator_id")["direction"]
|
|
||||||
.to_dict()
|
|
||||||
)
|
|
||||||
|
|
||||||
def _is_positive_yoy(ind_id, yoy_val):
|
def _is_positive_yoy(ind_id, yoy_val):
|
||||||
if pd.isna(yoy_val):
|
if pd.isna(yoy_val):
|
||||||
return False
|
return False
|
||||||
@@ -1003,16 +998,14 @@ class IndicatorNormAggregator:
|
|||||||
"best_yoy_from" : best_yoy_from,
|
"best_yoy_from" : best_yoy_from,
|
||||||
"best_yoy_to" : best_yoy_to,
|
"best_yoy_to" : best_yoy_to,
|
||||||
})
|
})
|
||||||
|
|
||||||
df_yoy_stats = pd.DataFrame(yoy_stats)
|
df_yoy_stats = pd.DataFrame(yoy_stats)
|
||||||
|
|
||||||
# ---- 12f. Country terbaik & terburuk ---------------------------------
|
# Country best/worst
|
||||||
df_country_avg = (
|
df_country_avg = (
|
||||||
df.groupby(["indicator_id", "country_id", "country_name"])
|
df.groupby(["indicator_id", "country_id", "country_name"])
|
||||||
.agg(country_avg_value=("value", "mean"))
|
.agg(country_avg_value=("value", "mean"))
|
||||||
.reset_index()
|
.reset_index()
|
||||||
)
|
)
|
||||||
|
|
||||||
country_stats = []
|
country_stats = []
|
||||||
for ind_id, grp in df_country_avg.groupby("indicator_id"):
|
for ind_id, grp in df_country_avg.groupby("indicator_id"):
|
||||||
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
||||||
@@ -1029,13 +1022,11 @@ class IndicatorNormAggregator:
|
|||||||
})
|
})
|
||||||
df_country_stats = pd.DataFrame(country_stats)
|
df_country_stats = pd.DataFrame(country_stats)
|
||||||
|
|
||||||
# ---- 12g. Dimensi tetap per indikator --------------------------------
|
# Dim cols
|
||||||
df_dim = (
|
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
|
||||||
df[["indicator_id"] + dim_cols]
|
df_dim = df[["indicator_id"] + dim_cols].drop_duplicates(subset=["indicator_id"])
|
||||||
.drop_duplicates(subset=["indicator_id"])
|
|
||||||
)
|
|
||||||
|
|
||||||
# ---- 12h. Merge semua ------------------------------------------------
|
# Merge semua
|
||||||
df_agg = (
|
df_agg = (
|
||||||
df_dim
|
df_dim
|
||||||
.merge(df_first, on="indicator_id", how="left")
|
.merge(df_first, on="indicator_id", how="left")
|
||||||
@@ -1046,32 +1037,32 @@ class IndicatorNormAggregator:
|
|||||||
.merge(df_country_stats, on="indicator_id", how="left")
|
.merge(df_country_stats, on="indicator_id", how="left")
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}")
|
# Performance
|
||||||
self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}")
|
|
||||||
|
|
||||||
# -- STEP 14: Assign performance ---------------------------------------
|
|
||||||
self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---")
|
|
||||||
df_agg["performance"] = pd.NA
|
df_agg["performance"] = pd.NA
|
||||||
has_score = df_agg["avg_norm_score_1_100"].notna()
|
has_score = df_agg["avg_norm_score_1_100"].notna()
|
||||||
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
|
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
|
||||||
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
|
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
|
||||||
n_good = (df_agg["performance"] == "Good").sum()
|
|
||||||
n_bad = (df_agg["performance"] == "Bad").sum()
|
|
||||||
self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}")
|
|
||||||
|
|
||||||
# -- STEP 15: Build narrative ------------------------------------------
|
# ---- Build narrative (bilingual, interpretatif, plain text) ----
|
||||||
self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---")
|
self.logger.info("\n--- BUILD NARRATIVE (interpretatif, plain text, bilingual EN/ID) ---")
|
||||||
df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1)
|
narratives_en = []
|
||||||
|
narratives_id = []
|
||||||
|
|
||||||
|
for _, row in df_agg.iterrows():
|
||||||
|
n_en, n_id = _build_narrative_per_indicator(row, df)
|
||||||
|
narratives_en.append(n_en)
|
||||||
|
narratives_id.append(n_id)
|
||||||
|
|
||||||
|
df_agg["narrative_en"] = narratives_en
|
||||||
|
df_agg["narrative_id"] = narratives_id
|
||||||
|
|
||||||
self.logger.info(f" Narratives generated: {len(df_agg):,}")
|
self.logger.info(f" Narratives generated: {len(df_agg):,}")
|
||||||
self.logger.info("\n Sample (first 2):")
|
self.logger.info("\n Sample EN (first):")
|
||||||
for _, row in df_agg.head(2).iterrows():
|
self.logger.info(f" {df_agg.iloc[0]['narrative_en'][:300]}")
|
||||||
self.logger.info(
|
self.logger.info("\n Sample ID (first):")
|
||||||
f"\n [{int(row['indicator_id'])}] {row['indicator_name'][:60]}"
|
self.logger.info(f" {df_agg.iloc[0]['narrative_id'][:300]}")
|
||||||
f"\n -> {row['narrative'][:300]}..."
|
|
||||||
)
|
|
||||||
|
|
||||||
# -- STEP 16: Save -----------------------------------------------------
|
# ---- Save ----
|
||||||
self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---")
|
|
||||||
out = df_agg[[
|
out = df_agg[[
|
||||||
"indicator_id", "indicator_name", "unit", "direction",
|
"indicator_id", "indicator_name", "unit", "direction",
|
||||||
"pillar_name", "framework",
|
"pillar_name", "framework",
|
||||||
@@ -1081,7 +1072,7 @@ class IndicatorNormAggregator:
|
|||||||
"n_yoy_total", "n_yoy_positive",
|
"n_yoy_total", "n_yoy_positive",
|
||||||
"best_yoy_from", "best_yoy_to",
|
"best_yoy_from", "best_yoy_to",
|
||||||
"country_worst", "country_best",
|
"country_worst", "country_best",
|
||||||
"narrative",
|
"narrative_en", "narrative_id",
|
||||||
]].copy()
|
]].copy()
|
||||||
|
|
||||||
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
|
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
|
||||||
@@ -1105,7 +1096,8 @@ class IndicatorNormAggregator:
|
|||||||
out["best_yoy_to"] = pd.to_numeric(out["best_yoy_to"], errors="coerce").astype("Int64")
|
out["best_yoy_to"] = pd.to_numeric(out["best_yoy_to"], errors="coerce").astype("Int64")
|
||||||
out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string")
|
out["country_worst"] = out["country_worst"].astype(str).replace("nan", pd.NA).astype("string")
|
||||||
out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string")
|
out["country_best"] = out["country_best"].astype(str).replace("nan", pd.NA).astype("string")
|
||||||
out["narrative"] = out["narrative"].astype(str)
|
out["narrative_en"] = out["narrative_en"].astype(str)
|
||||||
|
out["narrative_id"] = out["narrative_id"].astype(str)
|
||||||
|
|
||||||
schema = [
|
schema = [
|
||||||
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
||||||
@@ -1127,7 +1119,8 @@ class IndicatorNormAggregator:
|
|||||||
bigquery.SchemaField("best_yoy_to", "INTEGER", mode="NULLABLE"),
|
bigquery.SchemaField("best_yoy_to", "INTEGER", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("country_worst", "STRING", mode="NULLABLE"),
|
bigquery.SchemaField("country_worst", "STRING", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"),
|
bigquery.SchemaField("country_best", "STRING", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"),
|
bigquery.SchemaField("narrative_en", "STRING", mode="NULLABLE"),
|
||||||
|
bigquery.SchemaField("narrative_id", "STRING", mode="NULLABLE"),
|
||||||
]
|
]
|
||||||
|
|
||||||
rows_loaded = load_to_bigquery(
|
rows_loaded = load_to_bigquery(
|
||||||
@@ -1152,7 +1145,8 @@ class IndicatorNormAggregator:
|
|||||||
"config_snapshot" : json.dumps({
|
"config_snapshot" : json.dumps({
|
||||||
"source_table" : "agg_indicator_norm (in-memory df_final)",
|
"source_table" : "agg_indicator_norm (in-memory df_final)",
|
||||||
"granularity" : "indicator_id only (all years, all ASEAN countries)",
|
"granularity" : "indicator_id only (all years, all ASEAN countries)",
|
||||||
"aggregation" : "full-period summary per indicator",
|
"narrative_style" : "interpretive, plain text, no markdown, bilingual EN/ID",
|
||||||
|
"narrative_dimensions" : ["trend", "gap_trend", "anomaly", "country_consistency"],
|
||||||
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
||||||
"layer" : "gold",
|
"layer" : "gold",
|
||||||
}),
|
}),
|
||||||
@@ -1162,8 +1156,6 @@ class IndicatorNormAggregator:
|
|||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
save_etl_metadata(self.client, metadata)
|
save_etl_metadata(self.client, metadata)
|
||||||
self.logger.info(" Metadata -> [AUDIT] etl_metadata")
|
|
||||||
|
|
||||||
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
|
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
@@ -1194,7 +1186,6 @@ class IndicatorNormAggregator:
|
|||||||
rows_loaded = self._save(df_final)
|
rows_loaded = self._save(df_final)
|
||||||
self.pipeline_metadata["rows_loaded"] = rows_loaded
|
self.pipeline_metadata["rows_loaded"] = rows_loaded
|
||||||
self._log_summary(df_final)
|
self._log_summary(df_final)
|
||||||
|
|
||||||
self._build_narrative_table(df_final)
|
self._build_narrative_table(df_final)
|
||||||
|
|
||||||
self.pipeline_metadata["end_time"] = datetime.now()
|
self.pipeline_metadata["end_time"] = datetime.now()
|
||||||
@@ -1217,10 +1208,6 @@ class IndicatorNormAggregator:
|
|||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def run_indicator_norm_aggregation():
|
def run_indicator_norm_aggregation():
|
||||||
"""
|
|
||||||
Airflow task: Build agg_indicator_norm + agg_narrative_indicator.
|
|
||||||
Dipanggil setelah analytical_layer_to_gold selesai.
|
|
||||||
"""
|
|
||||||
client = get_bigquery_client()
|
client = get_bigquery_client()
|
||||||
agg = IndicatorNormAggregator(client)
|
agg = IndicatorNormAggregator(client)
|
||||||
agg.run()
|
agg.run()
|
||||||
@@ -1241,10 +1228,6 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
print("=" * 80)
|
print("=" * 80)
|
||||||
print("INDICATOR NORM AGGREGATION -> fs_asean_gold")
|
print("INDICATOR NORM AGGREGATION -> fs_asean_gold")
|
||||||
print(" Source : fact_asean_food_security_selected")
|
|
||||||
print(" Dim : dim_indicator (unit)")
|
|
||||||
print(" Output : agg_indicator_norm")
|
|
||||||
print(" agg_narrative_indicator")
|
|
||||||
print("=" * 80)
|
print("=" * 80)
|
||||||
|
|
||||||
logger = setup_logging()
|
logger = setup_logging()
|
||||||
|
|||||||
@@ -6,18 +6,14 @@ UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'):
|
|||||||
- agg_pillar_by_country
|
- agg_pillar_by_country
|
||||||
- agg_framework_by_country
|
- agg_framework_by_country
|
||||||
- agg_framework_asean (+ kolom performance_status: 'Good'/'Bad', threshold=60)
|
- agg_framework_asean (+ kolom performance_status: 'Good'/'Bad', threshold=60)
|
||||||
- agg_narrative_overview
|
- agg_narrative_overview (bilingual: narrative_en, narrative_id)
|
||||||
- agg_narrative_pillar
|
- agg_narrative_pillar (bilingual: narrative_en, narrative_id)
|
||||||
|
|
||||||
SOURCE TABLE: fact_asean_food_security_selected (sudah include nama + ID)
|
Narrative style:
|
||||||
|
- Plain text, tanpa markdown bold (**)
|
||||||
n_indicators logic (sesuai agg_indicator_norm):
|
- Interpretatif: membaca tren, gap, anomali, konsistensi dari data nyata
|
||||||
- Setiap tahun dihitung dari indikator yang benar-benar hadir di tahun tsb.
|
- Bilingual: narrative_en (Inggris) + narrative_id (Indonesia)
|
||||||
- Framework MDGs/SDGs per tahun mengikuti SDG_ONLY_KEYWORDS:
|
- Granularity: per tahun (Overview & Pillar)
|
||||||
* Indikator tidak di SDG_ONLY -> selalu MDGs
|
|
||||||
* Indikator di SDG_ONLY + year >= sdgs_start_year -> SDGs
|
|
||||||
* Indikator di SDG_ONLY + year < sdgs_start_year -> MDGs
|
|
||||||
- Sehingga n_indicators MDGs dan SDGs bisa berbeda antar tahun.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@@ -50,11 +46,8 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({
|
|||||||
})
|
})
|
||||||
|
|
||||||
NORMALIZE_FRAMEWORKS_JOINTLY = False
|
NORMALIZE_FRAMEWORKS_JOINTLY = False
|
||||||
|
PERFORMANCE_THRESHOLD = 60.0
|
||||||
|
|
||||||
# Threshold performance_status di agg_framework_asean
|
|
||||||
PERFORMANCE_THRESHOLD = 60.0 # score >= 60 -> "Good", < 60 -> "Bad"
|
|
||||||
|
|
||||||
# SDG_ONLY_KEYWORDS (sama persis dengan bigquery_aggraget_fact_selected_layer.py)
|
|
||||||
SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
||||||
"prevalence of undernourishment (percent) (3-year average)",
|
"prevalence of undernourishment (percent) (3-year average)",
|
||||||
"number of people undernourished (million) (3-year average)",
|
"number of people undernourished (million) (3-year average)",
|
||||||
@@ -90,7 +83,7 @@ _FIES_DETECTION_LOWER: frozenset = frozenset([
|
|||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# Windows CP1252 safe logging
|
# WINDOWS CP1252 SAFE LOGGING
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
class _SafeStreamHandler(logging.StreamHandler):
|
class _SafeStreamHandler(logging.StreamHandler):
|
||||||
@@ -178,16 +171,11 @@ def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger=
|
|||||||
|
|
||||||
|
|
||||||
def _performance_status(score) -> str:
|
def _performance_status(score) -> str:
|
||||||
"""Classify score into 'Good' or 'Bad' based on PERFORMANCE_THRESHOLD."""
|
|
||||||
if score is None or (isinstance(score, float) and np.isnan(score)):
|
if score is None or (isinstance(score, float) and np.isnan(score)):
|
||||||
return "N/A"
|
return "N/A"
|
||||||
return "Good" if score >= PERFORMANCE_THRESHOLD else "Bad"
|
return "Good" if score >= PERFORMANCE_THRESHOLD else "Bad"
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# NARRATIVE HELPERS
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
def _fmt_score(score) -> str:
|
def _fmt_score(score) -> str:
|
||||||
if score is None or (isinstance(score, float) and np.isnan(score)):
|
if score is None or (isinstance(score, float) and np.isnan(score)):
|
||||||
return "N/A"
|
return "N/A"
|
||||||
@@ -201,80 +189,235 @@ def _fmt_delta(delta) -> str:
|
|||||||
return f"{sign}{delta:.2f}"
|
return f"{sign}{delta:.2f}"
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# NARRATIVE CONDITION DETECTORS (shared)
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def _detect_series_trend(scores: list) -> str:
|
||||||
|
"""
|
||||||
|
Deteksi tren dari list skor berurutan.
|
||||||
|
Return: 'improving_consistent' | 'improving_slowing' | 'deteriorating' | 'fluctuating'
|
||||||
|
"""
|
||||||
|
if len(scores) < 3:
|
||||||
|
return "insufficient"
|
||||||
|
|
||||||
|
x = np.arange(len(scores))
|
||||||
|
slope = np.polyfit(x, scores, 1)[0]
|
||||||
|
cv = np.std(scores) / (np.mean(scores) + 1e-9)
|
||||||
|
|
||||||
|
if cv > 0.20:
|
||||||
|
return "fluctuating"
|
||||||
|
|
||||||
|
mid = len(scores) // 2
|
||||||
|
slope1 = np.polyfit(np.arange(mid), scores[:mid], 1)[0] if mid > 1 else slope
|
||||||
|
slope2 = np.polyfit(np.arange(len(scores) - mid), scores[mid:], 1)[0] if (len(scores) - mid) > 1 else slope
|
||||||
|
|
||||||
|
if slope > 0:
|
||||||
|
slowing = slope2 < slope1
|
||||||
|
return "improving_slowing" if slowing else "improving_consistent"
|
||||||
|
else:
|
||||||
|
return "deteriorating"
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_country_gap(scores_by_country_year: pd.DataFrame, score_col: str) -> str:
|
||||||
|
"""
|
||||||
|
Deteksi apakah std antar negara melebar atau menyempit dari waktu ke waktu.
|
||||||
|
scores_by_country_year: df dengan kolom [year, country_id, score_col]
|
||||||
|
"""
|
||||||
|
std_by_year = (
|
||||||
|
scores_by_country_year.groupby("year")[score_col]
|
||||||
|
.std().dropna()
|
||||||
|
)
|
||||||
|
if len(std_by_year) < 3:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
years = sorted(std_by_year.index)
|
||||||
|
stds = [std_by_year[y] for y in years]
|
||||||
|
slope = np.polyfit(np.arange(len(stds)), stds, 1)[0]
|
||||||
|
mean_s = np.mean(stds)
|
||||||
|
|
||||||
|
if abs(slope) < 0.02 * mean_s:
|
||||||
|
return "stable"
|
||||||
|
return "widening" if slope > 0 else "narrowing"
|
||||||
|
|
||||||
|
|
||||||
|
def _find_anomaly_year(values_by_year: dict) -> tuple:
|
||||||
|
"""
|
||||||
|
Cari tahun dengan perubahan YoY paling ekstrem.
|
||||||
|
values_by_year: {year: score}
|
||||||
|
Return: (year, 'drop' | 'rise') atau (None, None)
|
||||||
|
"""
|
||||||
|
years = sorted(values_by_year.keys())
|
||||||
|
deltas = {}
|
||||||
|
for i in range(1, len(years)):
|
||||||
|
y0, y1 = years[i-1], years[i]
|
||||||
|
v0, v1 = values_by_year.get(y0), values_by_year.get(y1)
|
||||||
|
if v0 is not None and v1 is not None and not (pd.isna(v0) or pd.isna(v1)):
|
||||||
|
deltas[y1] = v1 - v0
|
||||||
|
|
||||||
|
if not deltas:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
threshold = 1.5 * np.std(list(deltas.values()))
|
||||||
|
min_y = min(deltas, key=deltas.get)
|
||||||
|
max_y = max(deltas, key=deltas.get)
|
||||||
|
|
||||||
|
if abs(deltas[min_y]) > threshold and deltas[min_y] < 0:
|
||||||
|
return min_y, "drop"
|
||||||
|
if abs(deltas[max_y]) > threshold and deltas[max_y] > 0:
|
||||||
|
return max_y, "rise"
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# NARRATIVE BUILDER — OVERVIEW (per tahun)
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
def _build_overview_narrative(
|
def _build_overview_narrative(
|
||||||
year: int,
|
year: int,
|
||||||
n_mdg: int,
|
|
||||||
n_sdg: int,
|
|
||||||
n_total_ind: int,
|
|
||||||
score: float,
|
score: float,
|
||||||
performance_status: str,
|
performance_status: str,
|
||||||
yoy_val,
|
yoy_val,
|
||||||
yoy_pct,
|
n_mdg: int,
|
||||||
prev_year: int,
|
n_sdg: int,
|
||||||
prev_score,
|
|
||||||
prev_performance_status: str,
|
|
||||||
ranking_list: list,
|
ranking_list: list,
|
||||||
most_improved_country,
|
most_improved_country,
|
||||||
most_improved_delta,
|
most_improved_delta,
|
||||||
most_declined_country,
|
most_declined_country,
|
||||||
most_declined_delta,
|
most_declined_delta,
|
||||||
) -> str:
|
historical_scores: dict, # {year: score} semua tahun sebelumnya
|
||||||
|
country_scores_all: pd.DataFrame, # df [year, country_name, framework_score_1_100]
|
||||||
|
) -> tuple:
|
||||||
"""
|
"""
|
||||||
Narrative format (no em-dash):
|
Narasi overview per tahun — interpretatif, plain text, bilingual.
|
||||||
In {year}, ASEAN scored {score} ({performance}) across {n_total} indicators
|
Return: (narrative_en, narrative_id)
|
||||||
({n_mdg} MDGs, {n_sdg} SDGs). Score {increased/decreased} by {delta} pts from
|
|
||||||
{prev_year} ({prev_score}). {top_country} led the region; {bottom_country} ranked
|
|
||||||
last. Biggest gain: {country}; biggest drop: {country}.
|
|
||||||
"""
|
"""
|
||||||
|
sentences_en = []
|
||||||
|
sentences_id = []
|
||||||
|
|
||||||
# Sentence 1: score + performance + indicators
|
# ---- 1. Status tahun ini vs threshold ----
|
||||||
ind_parts = []
|
perf_word_en = "good" if performance_status == "Good" else "below target"
|
||||||
if n_mdg > 0:
|
perf_word_id = "baik" if performance_status == "Good" else "di bawah target"
|
||||||
ind_parts.append(f"**{n_mdg} MDGs**")
|
|
||||||
if n_sdg > 0:
|
|
||||||
ind_parts.append(f"**{n_sdg} SDGs**")
|
|
||||||
ind_detail = f" ({', '.join(ind_parts)})" if ind_parts else ""
|
|
||||||
|
|
||||||
sent1 = (
|
s1_en = (
|
||||||
f"In **{year}**, ASEAN scored **{_fmt_score(score)}** (*{performance_status}*) "
|
f"In {year}, ASEAN food security scored {_fmt_score(score)} out of 100 "
|
||||||
f"across **{n_total_ind} indicators**{ind_detail}."
|
f"({perf_word_en}), covering {n_mdg + n_sdg} indicators "
|
||||||
|
f"({n_mdg} MDGs and {n_sdg} SDGs)."
|
||||||
)
|
)
|
||||||
|
s1_id = (
|
||||||
# Sentence 2: YoY
|
f"Pada tahun {year}, skor ketahanan pangan ASEAN mencapai {_fmt_score(score)} dari 100 "
|
||||||
if yoy_val is not None and prev_score is not None:
|
f"({perf_word_id}), mencakup {n_mdg + n_sdg} indikator "
|
||||||
direction_word = "increased" if yoy_val >= 0 else "decreased"
|
f"({n_mdg} MDGs dan {n_sdg} SDGs)."
|
||||||
sent2 = (
|
|
||||||
f"Score {direction_word} by **{abs(yoy_val):.2f} pts** "
|
|
||||||
f"from {prev_year} ({_fmt_score(prev_score)}, *{prev_performance_status}*)."
|
|
||||||
)
|
)
|
||||||
|
sentences_en.append(s1_en)
|
||||||
|
sentences_id.append(s1_id)
|
||||||
|
|
||||||
|
# ---- 2. Kondisi YoY tahun ini ----
|
||||||
|
if yoy_val is not None and not pd.isna(yoy_val):
|
||||||
|
if abs(yoy_val) < 0.5:
|
||||||
|
s2_en = f"The score was relatively stable compared to the previous year."
|
||||||
|
s2_id = f"Skor relatif stabil dibandingkan tahun sebelumnya."
|
||||||
|
elif yoy_val > 0:
|
||||||
|
s2_en = f"This represents an improvement of {abs(yoy_val):.2f} points from the previous year."
|
||||||
|
s2_id = f"Ini merupakan peningkatan sebesar {abs(yoy_val):.2f} poin dari tahun sebelumnya."
|
||||||
else:
|
else:
|
||||||
sent2 = "No prior-year data available for comparison."
|
s2_en = f"This represents a decline of {abs(yoy_val):.2f} points from the previous year."
|
||||||
|
s2_id = f"Ini merupakan penurunan sebesar {abs(yoy_val):.2f} poin dari tahun sebelumnya."
|
||||||
|
sentences_en.append(s2_en)
|
||||||
|
sentences_id.append(s2_id)
|
||||||
|
|
||||||
# Sentence 3: country ranking
|
# ---- 3. Tren historis (baca dari semua data yang ada) ----
|
||||||
sent3 = ""
|
hist_years = sorted(historical_scores.keys())
|
||||||
if ranking_list:
|
hist_scores = [historical_scores[y] for y in hist_years if not pd.isna(historical_scores.get(y, np.nan))]
|
||||||
first = ranking_list[0]
|
|
||||||
last = ranking_list[-1]
|
if len(hist_scores) >= 3:
|
||||||
if len(ranking_list) == 1:
|
trend = _detect_series_trend(hist_scores)
|
||||||
sent3 = f"**{first['country_name']}** was the only country assessed ({_fmt_score(first['score'])})."
|
if trend == "improving_consistent":
|
||||||
|
s3_en = f"The overall trajectory since {hist_years[0]} has been consistently upward."
|
||||||
|
s3_id = f"Trajektori keseluruhan sejak {hist_years[0]} menunjukkan tren yang konsisten meningkat."
|
||||||
|
elif trend == "improving_slowing":
|
||||||
|
s3_en = f"While the long-term trend since {hist_years[0]} is positive, the pace of improvement has slowed in recent years."
|
||||||
|
s3_id = f"Meskipun tren jangka panjang sejak {hist_years[0]} positif, laju perbaikan melambat dalam beberapa tahun terakhir."
|
||||||
|
elif trend == "deteriorating":
|
||||||
|
s3_en = f"The overall trend since {hist_years[0]} shows a declining trajectory that warrants attention."
|
||||||
|
s3_id = f"Tren keseluruhan sejak {hist_years[0]} menunjukkan trajektori yang menurun dan perlu perhatian."
|
||||||
|
elif trend == "fluctuating":
|
||||||
|
s3_en = f"Progress since {hist_years[0]} has been uneven, with scores fluctuating across years."
|
||||||
|
s3_id = f"Kemajuan sejak {hist_years[0]} tidak merata, dengan skor yang berfluktuasi antar tahun."
|
||||||
else:
|
else:
|
||||||
sent3 = (
|
s3_en = ""
|
||||||
f"**{first['country_name']}** led the region ({_fmt_score(first['score'])}); "
|
s3_id = ""
|
||||||
f"**{last['country_name']}** ranked last ({_fmt_score(last['score'])})."
|
|
||||||
|
if s3_en:
|
||||||
|
sentences_en.append(s3_en)
|
||||||
|
sentences_id.append(s3_id)
|
||||||
|
|
||||||
|
# ---- 4. Gap antar negara ----
|
||||||
|
if not country_scores_all.empty:
|
||||||
|
gap_trend = _detect_country_gap(
|
||||||
|
country_scores_all[country_scores_all["year"] <= year],
|
||||||
|
"framework_score_1_100"
|
||||||
)
|
)
|
||||||
|
if gap_trend == "widening":
|
||||||
|
s4_en = "The performance gap among ASEAN member states has widened over time, indicating unequal progress."
|
||||||
|
s4_id = "Kesenjangan performa antar negara anggota ASEAN semakin melebar, mengindikasikan kemajuan yang tidak merata."
|
||||||
|
elif gap_trend == "narrowing":
|
||||||
|
s4_en = "The performance gap among ASEAN member states has narrowed, reflecting more balanced regional development."
|
||||||
|
s4_id = "Kesenjangan performa antar negara anggota ASEAN semakin menyempit, mencerminkan pembangunan regional yang lebih merata."
|
||||||
|
elif gap_trend == "stable":
|
||||||
|
s4_en = "The performance gap among ASEAN member states has remained relatively stable."
|
||||||
|
s4_id = "Kesenjangan performa antar negara anggota ASEAN relatif stabil."
|
||||||
|
else:
|
||||||
|
s4_en = ""
|
||||||
|
s4_id = ""
|
||||||
|
|
||||||
# Sentence 4: most improved / declined
|
if s4_en:
|
||||||
sent4_parts = []
|
sentences_en.append(s4_en)
|
||||||
if most_improved_country and most_improved_delta is not None:
|
sentences_id.append(s4_id)
|
||||||
sent4_parts.append(f"Biggest gain: **{most_improved_country}** ({_fmt_delta(most_improved_delta)} pts)")
|
|
||||||
if most_declined_country and most_declined_delta is not None:
|
|
||||||
sent4_parts.append(f"biggest drop: **{most_declined_country}** ({_fmt_delta(most_declined_delta)} pts)")
|
|
||||||
sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else ""
|
|
||||||
if sent4:
|
|
||||||
sent4 = sent4[0].upper() + sent4[1:]
|
|
||||||
|
|
||||||
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
|
# ---- 5. Top dan bottom country tahun ini ----
|
||||||
|
if ranking_list and len(ranking_list) >= 2:
|
||||||
|
top = ranking_list[0]
|
||||||
|
bottom = ranking_list[-1]
|
||||||
|
s5_en = (
|
||||||
|
f"In {year}, {top['country_name']} led the region with a score of "
|
||||||
|
f"{_fmt_score(top['score'])}, while {bottom['country_name']} ranked last "
|
||||||
|
f"at {_fmt_score(bottom['score'])}."
|
||||||
|
)
|
||||||
|
s5_id = (
|
||||||
|
f"Pada tahun {year}, {top['country_name']} memimpin kawasan dengan skor "
|
||||||
|
f"{_fmt_score(top['score'])}, sementara {bottom['country_name']} berada di "
|
||||||
|
f"posisi terbawah dengan skor {_fmt_score(bottom['score'])}."
|
||||||
|
)
|
||||||
|
sentences_en.append(s5_en)
|
||||||
|
sentences_id.append(s5_id)
|
||||||
|
|
||||||
|
# ---- 6. Most improved / declined country ----
|
||||||
|
if most_improved_country and most_declined_country:
|
||||||
|
if most_improved_country != most_declined_country:
|
||||||
|
s6_en = (
|
||||||
|
f"{most_improved_country} showed the biggest improvement "
|
||||||
|
f"({_fmt_delta(most_improved_delta)} pts), "
|
||||||
|
f"while {most_declined_country} experienced the largest decline "
|
||||||
|
f"({_fmt_delta(most_declined_delta)} pts)."
|
||||||
|
)
|
||||||
|
s6_id = (
|
||||||
|
f"{most_improved_country} mencatat peningkatan terbesar "
|
||||||
|
f"({_fmt_delta(most_improved_delta)} poin), "
|
||||||
|
f"sementara {most_declined_country} mengalami penurunan terbesar "
|
||||||
|
f"({_fmt_delta(most_declined_delta)} poin)."
|
||||||
|
)
|
||||||
|
sentences_en.append(s6_en)
|
||||||
|
sentences_id.append(s6_id)
|
||||||
|
|
||||||
|
narrative_en = " ".join(s for s in sentences_en if s)
|
||||||
|
narrative_id = " ".join(s for s in sentences_id if s)
|
||||||
|
return narrative_en, narrative_id
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# NARRATIVE BUILDER — PILLAR (per tahun per pilar)
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
def _build_pillar_narrative(
|
def _build_pillar_narrative(
|
||||||
year: int,
|
year: int,
|
||||||
@@ -283,70 +426,137 @@ def _build_pillar_narrative(
|
|||||||
rank_in_year: int,
|
rank_in_year: int,
|
||||||
n_pillars: int,
|
n_pillars: int,
|
||||||
yoy_val,
|
yoy_val,
|
||||||
top_country,
|
top_country: str,
|
||||||
top_country_score,
|
top_country_score,
|
||||||
bot_country,
|
bot_country: str,
|
||||||
bot_country_score,
|
bot_country_score,
|
||||||
strongest_pillar,
|
pillar_scores_history: dict, # {year: score} untuk pilar ini
|
||||||
strongest_score,
|
all_pillar_scores_year: pd.DataFrame, # df [pillar_name, pillar_score_1_100] tahun ini
|
||||||
weakest_pillar,
|
country_pillar_all: pd.DataFrame, # df [year, country_id, pillar_country_score_1_100] pilar ini
|
||||||
weakest_score,
|
) -> tuple:
|
||||||
most_improved_pillar,
|
|
||||||
most_improved_delta,
|
|
||||||
most_declined_pillar,
|
|
||||||
most_declined_delta,
|
|
||||||
) -> str:
|
|
||||||
"""
|
"""
|
||||||
Narrative format (no em-dash):
|
Narasi pillar per tahun — interpretatif, plain text, bilingual.
|
||||||
In {year}, {pillar} ranked {rank}/{n} with score {score}, {up/down} {delta} pts YoY.
|
Return: (narrative_en, narrative_id)
|
||||||
Top country: {top_country}; bottom: {bot_country}.
|
|
||||||
Strongest pillar: {pillar}; weakest: {pillar}.
|
|
||||||
"""
|
"""
|
||||||
|
sentences_en = []
|
||||||
|
sentences_id = []
|
||||||
|
|
||||||
|
# ---- 1. Posisi pilar tahun ini ----
|
||||||
rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th")
|
rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th")
|
||||||
|
perf_word_en = "good" if pillar_score >= PERFORMANCE_THRESHOLD else "below target"
|
||||||
|
perf_word_id = "baik" if pillar_score >= PERFORMANCE_THRESHOLD else "di bawah target"
|
||||||
|
|
||||||
# Sentence 1: rank + score + YoY
|
s1_en = (
|
||||||
if yoy_val is not None:
|
f"In {year}, the {pillar_name} pillar ranked {rank_in_year}{rank_suffix} out of "
|
||||||
direction_word = "up" if yoy_val >= 0 else "down"
|
f"{n_pillars} pillars with a score of {_fmt_score(pillar_score)} ({perf_word_en})."
|
||||||
yoy_clause = f", {direction_word} **{abs(yoy_val):.2f} pts** YoY"
|
)
|
||||||
|
s1_id = (
|
||||||
|
f"Pada tahun {year}, pilar {pillar_name} menempati peringkat {rank_in_year} dari "
|
||||||
|
f"{n_pillars} pilar dengan skor {_fmt_score(pillar_score)} ({perf_word_id})."
|
||||||
|
)
|
||||||
|
sentences_en.append(s1_en)
|
||||||
|
sentences_id.append(s1_id)
|
||||||
|
|
||||||
|
# ---- 2. YoY pilar ini ----
|
||||||
|
if yoy_val is not None and not pd.isna(yoy_val):
|
||||||
|
if abs(yoy_val) < 0.5:
|
||||||
|
s2_en = "Performance was relatively stable compared to the previous year."
|
||||||
|
s2_id = "Performa relatif stabil dibandingkan tahun sebelumnya."
|
||||||
|
elif yoy_val > 0:
|
||||||
|
s2_en = f"This is an improvement of {abs(yoy_val):.2f} points from the previous year."
|
||||||
|
s2_id = f"Ini merupakan peningkatan {abs(yoy_val):.2f} poin dari tahun sebelumnya."
|
||||||
else:
|
else:
|
||||||
yoy_clause = ", no prior-year data"
|
s2_en = f"This marks a decline of {abs(yoy_val):.2f} points from the previous year."
|
||||||
|
s2_id = f"Ini menandai penurunan {abs(yoy_val):.2f} poin dari tahun sebelumnya."
|
||||||
|
sentences_en.append(s2_en)
|
||||||
|
sentences_id.append(s2_id)
|
||||||
|
|
||||||
sent1 = (
|
# ---- 3. Tren historis pilar ini ----
|
||||||
f"In **{year}**, **{pillar_name}** ranked **{rank_in_year}{rank_suffix}/{n_pillars}** "
|
hist_years = sorted(pillar_scores_history.keys())
|
||||||
f"with score **{_fmt_score(pillar_score)}**{yoy_clause}."
|
hist_scores = [
|
||||||
)
|
pillar_scores_history[y]
|
||||||
|
for y in hist_years
|
||||||
|
if not pd.isna(pillar_scores_history.get(y, np.nan))
|
||||||
|
]
|
||||||
|
|
||||||
# Sentence 2: top / bottom country
|
if len(hist_scores) >= 3:
|
||||||
sent2 = ""
|
trend = _detect_series_trend(hist_scores)
|
||||||
if top_country and bot_country:
|
if trend == "improving_consistent":
|
||||||
if top_country != bot_country:
|
s3_en = f"This pillar has shown consistent improvement since {hist_years[0]}."
|
||||||
sent2 = (
|
s3_id = f"Pilar ini menunjukkan perbaikan yang konsisten sejak {hist_years[0]}."
|
||||||
f"Top country: **{top_country}** ({_fmt_score(top_country_score)}); "
|
elif trend == "improving_slowing":
|
||||||
f"bottom: **{bot_country}** ({_fmt_score(bot_country_score)})."
|
s3_en = f"While the pillar improved since {hist_years[0]}, the pace has slowed in recent years."
|
||||||
)
|
s3_id = f"Meskipun pilar ini membaik sejak {hist_years[0]}, lajunya melambat dalam beberapa tahun terakhir."
|
||||||
|
elif trend == "deteriorating":
|
||||||
|
s3_en = f"This pillar has shown a declining trend since {hist_years[0]}, requiring targeted intervention."
|
||||||
|
s3_id = f"Pilar ini menunjukkan tren penurunan sejak {hist_years[0]}, memerlukan intervensi yang terarah."
|
||||||
|
elif trend == "fluctuating":
|
||||||
|
s3_en = f"Performance in this pillar has been inconsistent since {hist_years[0]}, with no clear trend."
|
||||||
|
s3_id = f"Performa pilar ini tidak konsisten sejak {hist_years[0]}, tanpa tren yang jelas."
|
||||||
else:
|
else:
|
||||||
sent2 = f"**{top_country}** was the only country with data ({_fmt_score(top_country_score)})."
|
s3_en = ""
|
||||||
|
s3_id = ""
|
||||||
|
|
||||||
# Sentence 3: strongest / weakest pillar
|
if s3_en:
|
||||||
sent3 = ""
|
sentences_en.append(s3_en)
|
||||||
if strongest_pillar and weakest_pillar:
|
sentences_id.append(s3_id)
|
||||||
sent3 = (
|
|
||||||
f"Strongest pillar: **{strongest_pillar}** ({_fmt_score(strongest_score)}); "
|
# ---- 4. Gap antar negara dalam pilar ini ----
|
||||||
f"weakest: **{weakest_pillar}** ({_fmt_score(weakest_score)})."
|
if not country_pillar_all.empty:
|
||||||
|
gap_trend = _detect_country_gap(
|
||||||
|
country_pillar_all[country_pillar_all["year"] <= year],
|
||||||
|
"pillar_country_score_1_100"
|
||||||
)
|
)
|
||||||
|
if gap_trend == "widening":
|
||||||
|
s4_en = "Country disparities within this pillar have widened over time."
|
||||||
|
s4_id = "Kesenjangan antar negara dalam pilar ini semakin melebar seiring waktu."
|
||||||
|
elif gap_trend == "narrowing":
|
||||||
|
s4_en = "Country disparities within this pillar have narrowed, indicating more balanced progress."
|
||||||
|
s4_id = "Kesenjangan antar negara dalam pilar ini menyempit, mengindikasikan kemajuan yang lebih merata."
|
||||||
|
else:
|
||||||
|
s4_en = ""
|
||||||
|
s4_id = ""
|
||||||
|
|
||||||
# Sentence 4: most improved / declined pillar
|
if s4_en:
|
||||||
sent4_parts = []
|
sentences_en.append(s4_en)
|
||||||
if most_improved_pillar and most_improved_delta is not None:
|
sentences_id.append(s4_id)
|
||||||
sent4_parts.append(f"Best gain: **{most_improved_pillar}** ({_fmt_delta(most_improved_delta)} pts)")
|
|
||||||
if most_declined_pillar and most_declined_delta is not None:
|
|
||||||
sent4_parts.append(f"largest drop: **{most_declined_pillar}** ({_fmt_delta(most_declined_delta)} pts)")
|
|
||||||
sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else ""
|
|
||||||
if sent4:
|
|
||||||
sent4 = sent4[0].upper() + sent4[1:]
|
|
||||||
|
|
||||||
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
|
# ---- 5. Top/bottom country dalam pilar ini ----
|
||||||
|
if top_country and bot_country and top_country != bot_country:
|
||||||
|
s5_en = (
|
||||||
|
f"{top_country} performed best in this pillar ({_fmt_score(top_country_score)}), "
|
||||||
|
f"while {bot_country} had the lowest score ({_fmt_score(bot_country_score)})."
|
||||||
|
)
|
||||||
|
s5_id = (
|
||||||
|
f"{top_country} memiliki performa terbaik dalam pilar ini ({_fmt_score(top_country_score)}), "
|
||||||
|
f"sementara {bot_country} memiliki skor terendah ({_fmt_score(bot_country_score)})."
|
||||||
|
)
|
||||||
|
sentences_en.append(s5_en)
|
||||||
|
sentences_id.append(s5_id)
|
||||||
|
|
||||||
|
# ---- 6. Posisi relatif pilar ini vs pilar lain ----
|
||||||
|
if not all_pillar_scores_year.empty and len(all_pillar_scores_year) > 1:
|
||||||
|
sorted_pillars = all_pillar_scores_year.sort_values("pillar_score_1_100", ascending=False)
|
||||||
|
strongest = sorted_pillars.iloc[0]
|
||||||
|
weakest = sorted_pillars.iloc[-1]
|
||||||
|
|
||||||
|
if strongest["pillar_name"] != pillar_name and weakest["pillar_name"] != pillar_name:
|
||||||
|
s6_en = (
|
||||||
|
f"Across all pillars in {year}, {strongest['pillar_name']} scored highest "
|
||||||
|
f"({_fmt_score(strongest['pillar_score_1_100'])}) and {weakest['pillar_name']} "
|
||||||
|
f"scored lowest ({_fmt_score(weakest['pillar_score_1_100'])})."
|
||||||
|
)
|
||||||
|
s6_id = (
|
||||||
|
f"Di antara semua pilar pada tahun {year}, {strongest['pillar_name']} mendapat skor "
|
||||||
|
f"tertinggi ({_fmt_score(strongest['pillar_score_1_100'])}) dan {weakest['pillar_name']} "
|
||||||
|
f"mendapat skor terendah ({_fmt_score(weakest['pillar_score_1_100'])})."
|
||||||
|
)
|
||||||
|
sentences_en.append(s6_en)
|
||||||
|
sentences_id.append(s6_id)
|
||||||
|
|
||||||
|
narrative_en = " ".join(s for s in sentences_en if s)
|
||||||
|
narrative_id = " ".join(s for s in sentences_id if s)
|
||||||
|
return narrative_en, narrative_id
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
@@ -371,8 +581,6 @@ class FoodSecurityAggregator:
|
|||||||
|
|
||||||
self.df = None
|
self.df = None
|
||||||
self.sdgs_start_year = None
|
self.sdgs_start_year = None
|
||||||
|
|
||||||
# Lookup: (indicator_id, year) -> framework label
|
|
||||||
self._ind_year_framework: pd.DataFrame = None
|
self._ind_year_framework: pd.DataFrame = None
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
@@ -398,31 +606,23 @@ class FoodSecurityAggregator:
|
|||||||
missing_cols = required_cols - set(self.df.columns)
|
missing_cols = required_cols - set(self.df.columns)
|
||||||
if missing_cols:
|
if missing_cols:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Kolom berikut tidak ditemukan di fact_asean_food_security_selected: {missing_cols}"
|
f"Kolom berikut tidak ditemukan: {missing_cols}"
|
||||||
)
|
)
|
||||||
|
|
||||||
n_null_dir = self.df["direction"].isna().sum()
|
n_null_dir = self.df["direction"].isna().sum()
|
||||||
if n_null_dir > 0:
|
if n_null_dir > 0:
|
||||||
self.logger.warning(
|
self.logger.warning(f" [DIRECTION] {n_null_dir} rows NULL -> diisi 'positive'")
|
||||||
f" [DIRECTION] {n_null_dir} rows dengan direction NULL -> diisi 'positive'"
|
|
||||||
)
|
|
||||||
self.df["direction"] = self.df["direction"].fillna("positive")
|
self.df["direction"] = self.df["direction"].fillna("positive")
|
||||||
|
|
||||||
dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts()
|
self.logger.info(f" Rows : {len(self.df):,}")
|
||||||
self.logger.info(f"\n Distribusi direction per indikator:")
|
self.logger.info(f" Countries : {self.df['country_id'].nunique()}")
|
||||||
for d, cnt in dir_dist.items():
|
self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}")
|
||||||
tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal"
|
|
||||||
self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]")
|
|
||||||
|
|
||||||
self.logger.info(f"\n Rows loaded : {len(self.df):,}")
|
|
||||||
self.logger.info(f" Negara : {self.df['country_id'].nunique()}")
|
|
||||||
self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}")
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f" Tahun : {int(self.df['year'].min())} - {int(self.df['year'].max())}"
|
f" Years : {int(self.df['year'].min())} - {int(self.df['year'].max())}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 1b: Detect sdgs_start_year + assign framework per (indicator, year)
|
# STEP 1b: Detect sdgs_start_year + assign framework
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _detect_sdgs_start_year(self) -> int:
|
def _detect_sdgs_start_year(self) -> int:
|
||||||
@@ -440,7 +640,6 @@ class FoodSecurityAggregator:
|
|||||||
)
|
)
|
||||||
unique_years = sorted(ind_min_year["min_year"].unique())
|
unique_years = sorted(ind_min_year["min_year"].unique())
|
||||||
if len(unique_years) == 1:
|
if len(unique_years) == 1:
|
||||||
self.logger.info(" [Fallback] Hanya 1 cluster -> semua MDGs")
|
|
||||||
return int(unique_years[0]) + 9999
|
return int(unique_years[0]) + 9999
|
||||||
|
|
||||||
gaps = [
|
gaps = [
|
||||||
@@ -449,12 +648,12 @@ class FoodSecurityAggregator:
|
|||||||
]
|
]
|
||||||
gaps.sort(reverse=True)
|
gaps.sort(reverse=True)
|
||||||
_, y_before, y_after = gaps[0]
|
_, y_before, y_after = gaps[0]
|
||||||
self.logger.info(f" [Fallback gap] sdgs_start_year = {y_after} (gap {y_before}->{y_after})")
|
self.logger.info(f" [Fallback gap] sdgs_start_year = {y_after}")
|
||||||
return int(y_after)
|
return int(y_after)
|
||||||
|
|
||||||
def _assign_framework_labels(self):
|
def _assign_framework_labels(self):
|
||||||
self.logger.info("\n" + "=" * 70)
|
self.logger.info("\n" + "=" * 70)
|
||||||
self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS (per indicator per year)")
|
self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS")
|
||||||
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
|
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
|
||||||
self.logger.info("=" * 70)
|
self.logger.info("=" * 70)
|
||||||
|
|
||||||
@@ -477,21 +676,6 @@ class FoodSecurityAggregator:
|
|||||||
for fw, cnt in fw_dist.items():
|
for fw, cnt in fw_dist.items():
|
||||||
self.logger.info(f" {fw:<6}: {cnt:,} rows")
|
self.logger.info(f" {fw:<6}: {cnt:,} rows")
|
||||||
|
|
||||||
ind_fw_yr = (
|
|
||||||
self._ind_year_framework
|
|
||||||
.groupby(["year", "framework"])["indicator_id"]
|
|
||||||
.nunique()
|
|
||||||
.reset_index()
|
|
||||||
.rename(columns={"indicator_id": "n_indicators"})
|
|
||||||
.sort_values(["year", "framework"])
|
|
||||||
)
|
|
||||||
self.logger.info(f"\n {'Year':<6} {'Framework':<8} {'n_indicators'}")
|
|
||||||
self.logger.info(" " + "-" * 30)
|
|
||||||
for _, r in ind_fw_yr.iterrows():
|
|
||||||
self.logger.info(
|
|
||||||
f" {int(r['year']):<6} {r['framework']:<8} {int(r['n_indicators'])}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _count_framework_indicators(self, year: int, framework: str) -> int:
|
def _count_framework_indicators(self, year: int, framework: str) -> int:
|
||||||
mask = (
|
mask = (
|
||||||
(self._ind_year_framework["year"] == year) &
|
(self._ind_year_framework["year"] == year) &
|
||||||
@@ -505,9 +689,7 @@ class FoodSecurityAggregator:
|
|||||||
|
|
||||||
def _get_norm_value_df(self) -> pd.DataFrame:
|
def _get_norm_value_df(self) -> pd.DataFrame:
|
||||||
if "framework" not in self.df.columns:
|
if "framework" not in self.df.columns:
|
||||||
raise ValueError(
|
raise ValueError("Kolom 'framework' tidak ada.")
|
||||||
"Kolom 'framework' tidak ada. Pastikan _assign_framework_labels() dipanggil lebih dulu."
|
|
||||||
)
|
|
||||||
|
|
||||||
norm_parts = []
|
norm_parts = []
|
||||||
for ind_id, grp in self.df.groupby("indicator_id"):
|
for ind_id, grp in self.df.groupby("indicator_id"):
|
||||||
@@ -737,7 +919,6 @@ class FoodSecurityAggregator:
|
|||||||
df_normed = self._get_norm_value_df()
|
df_normed = self._get_norm_value_df()
|
||||||
parts = []
|
parts = []
|
||||||
|
|
||||||
# TOTAL
|
|
||||||
agg_total = (
|
agg_total = (
|
||||||
country_composite[[
|
country_composite[[
|
||||||
"country_id", "country_name", "year",
|
"country_id", "country_name", "year",
|
||||||
@@ -752,7 +933,6 @@ class FoodSecurityAggregator:
|
|||||||
agg_total["framework"] = "Total"
|
agg_total["framework"] = "Total"
|
||||||
parts.append(agg_total)
|
parts.append(agg_total)
|
||||||
|
|
||||||
# MDGs pre-SDGs
|
|
||||||
pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy()
|
pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy()
|
||||||
if not pre_sdgs_rows.empty:
|
if not pre_sdgs_rows.empty:
|
||||||
mdgs_pre = (
|
mdgs_pre = (
|
||||||
@@ -769,7 +949,6 @@ class FoodSecurityAggregator:
|
|||||||
mdgs_pre["framework"] = "MDGs"
|
mdgs_pre["framework"] = "MDGs"
|
||||||
parts.append(mdgs_pre)
|
parts.append(mdgs_pre)
|
||||||
|
|
||||||
# MDGs mixed (year >= sdgs_start_year, hanya indikator MDGs)
|
|
||||||
mdgs_indicator_ids = set(
|
mdgs_indicator_ids = set(
|
||||||
self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"]
|
self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"]
|
||||||
)
|
)
|
||||||
@@ -793,7 +972,6 @@ class FoodSecurityAggregator:
|
|||||||
agg_mdgs_mixed["framework"] = "MDGs"
|
agg_mdgs_mixed["framework"] = "MDGs"
|
||||||
parts.append(agg_mdgs_mixed)
|
parts.append(agg_mdgs_mixed)
|
||||||
|
|
||||||
# SDGs
|
|
||||||
sdgs_indicator_ids = set(
|
sdgs_indicator_ids = set(
|
||||||
self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"]
|
self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"]
|
||||||
)
|
)
|
||||||
@@ -864,7 +1042,7 @@ class FoodSecurityAggregator:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 5: agg_framework_asean (+ performance_status)
|
# STEP 5: agg_framework_asean
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def calc_framework_asean(self) -> pd.DataFrame:
|
def calc_framework_asean(self) -> pd.DataFrame:
|
||||||
@@ -872,7 +1050,6 @@ class FoodSecurityAggregator:
|
|||||||
self.load_metadata[table_name]["start_time"] = datetime.now()
|
self.load_metadata[table_name]["start_time"] = datetime.now()
|
||||||
self.logger.info("\n" + "=" * 70)
|
self.logger.info("\n" + "=" * 70)
|
||||||
self.logger.info(f"STEP 5: {table_name} -> [Gold] fs_asean_gold")
|
self.logger.info(f"STEP 5: {table_name} -> [Gold] fs_asean_gold")
|
||||||
self.logger.info(f" performance_status threshold: {PERFORMANCE_THRESHOLD}")
|
|
||||||
self.logger.info("=" * 70)
|
self.logger.info("=" * 70)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -907,7 +1084,6 @@ class FoodSecurityAggregator:
|
|||||||
def _n_ind(year_val, framework_val):
|
def _n_ind(year_val, framework_val):
|
||||||
return self._count_framework_indicators(year_val, framework_val)
|
return self._count_framework_indicators(year_val, framework_val)
|
||||||
|
|
||||||
# TOTAL
|
|
||||||
total_cols = asean_overall[[
|
total_cols = asean_overall[[
|
||||||
"year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"
|
"year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"
|
||||||
]].copy().rename(columns={
|
]].copy().rename(columns={
|
||||||
@@ -923,7 +1099,6 @@ class FoodSecurityAggregator:
|
|||||||
total_cols["framework"] = "Total"
|
total_cols["framework"] = "Total"
|
||||||
parts.append(total_cols)
|
parts.append(total_cols)
|
||||||
|
|
||||||
# MDGs pre-SDGs
|
|
||||||
pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy()
|
pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy()
|
||||||
if not pre_sdgs.empty:
|
if not pre_sdgs.empty:
|
||||||
mdgs_pre = pre_sdgs[[
|
mdgs_pre = pre_sdgs[[
|
||||||
@@ -937,7 +1112,6 @@ class FoodSecurityAggregator:
|
|||||||
mdgs_pre["framework"] = "MDGs"
|
mdgs_pre["framework"] = "MDGs"
|
||||||
parts.append(mdgs_pre)
|
parts.append(mdgs_pre)
|
||||||
|
|
||||||
# MDGs mixed
|
|
||||||
mdgs_indicator_ids = set(
|
mdgs_indicator_ids = set(
|
||||||
self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"]
|
self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"]
|
||||||
)
|
)
|
||||||
@@ -963,7 +1137,6 @@ class FoodSecurityAggregator:
|
|||||||
asean_mdgs["framework"] = "MDGs"
|
asean_mdgs["framework"] = "MDGs"
|
||||||
parts.append(asean_mdgs)
|
parts.append(asean_mdgs)
|
||||||
|
|
||||||
# SDGs
|
|
||||||
sdgs_indicator_ids = set(
|
sdgs_indicator_ids = set(
|
||||||
self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"]
|
self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"]
|
||||||
)
|
)
|
||||||
@@ -1005,21 +1178,9 @@ class FoodSecurityAggregator:
|
|||||||
df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger)
|
df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger)
|
||||||
for col in ["framework_norm", "std_norm", "framework_score_1_100"]:
|
for col in ["framework_norm", "std_norm", "framework_score_1_100"]:
|
||||||
df[col] = df[col].astype(float)
|
df[col] = df[col].astype(float)
|
||||||
df["performance_status"] = df["performance_status"].astype(str)
|
|
||||||
|
|
||||||
self._validate_mdgs_equals_total(df, level="asean")
|
self._validate_mdgs_equals_total(df, level="asean")
|
||||||
|
|
||||||
self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):")
|
|
||||||
for fw in df["framework"].unique():
|
|
||||||
sub = df[df["framework"] == fw].sort_values("year")
|
|
||||||
for _, r in sub.iterrows():
|
|
||||||
self.logger.info(
|
|
||||||
f" {fw:<8} {int(r['year'])}: "
|
|
||||||
f"score={r['framework_score_1_100']:.2f} "
|
|
||||||
f"n_ind={int(r['n_indicators'])} "
|
|
||||||
f"-> {r['performance_status']}"
|
|
||||||
)
|
|
||||||
|
|
||||||
schema = [
|
schema = [
|
||||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||||
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
||||||
@@ -1055,6 +1216,7 @@ class FoodSecurityAggregator:
|
|||||||
self.load_metadata[table_name]["start_time"] = datetime.now()
|
self.load_metadata[table_name]["start_time"] = datetime.now()
|
||||||
self.logger.info("\n" + "=" * 70)
|
self.logger.info("\n" + "=" * 70)
|
||||||
self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold")
|
self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold")
|
||||||
|
self.logger.info(" Narrative: interpretatif, plain text, bilingual EN/ID")
|
||||||
self.logger.info("=" * 70)
|
self.logger.info("=" * 70)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -1122,23 +1284,23 @@ class FoodSecurityAggregator:
|
|||||||
most_improved_country = most_declined_country = None
|
most_improved_country = most_declined_country = None
|
||||||
most_improved_delta = most_declined_delta = None
|
most_improved_delta = most_declined_delta = None
|
||||||
|
|
||||||
narrative = _build_overview_narrative(
|
# Semua data skor negara untuk gap analysis
|
||||||
|
country_scores_all = country_total[["year", "country_id", "framework_score_1_100"]].copy()
|
||||||
|
|
||||||
|
narrative_en, narrative_id = _build_overview_narrative(
|
||||||
year = yr,
|
year = yr,
|
||||||
n_mdg = n_mdg,
|
|
||||||
n_sdg = n_sdg,
|
|
||||||
n_total_ind = n_total_ind,
|
|
||||||
score = score,
|
score = score,
|
||||||
performance_status = perf_status,
|
performance_status = perf_status,
|
||||||
yoy_val = yoy_val,
|
yoy_val = yoy_val,
|
||||||
yoy_pct = yoy_pct,
|
n_mdg = n_mdg,
|
||||||
prev_year = yr - 1,
|
n_sdg = n_sdg,
|
||||||
prev_score = prev_score,
|
|
||||||
prev_performance_status = prev_status,
|
|
||||||
ranking_list = ranking_list,
|
ranking_list = ranking_list,
|
||||||
most_improved_country = most_improved_country,
|
most_improved_country = most_improved_country,
|
||||||
most_improved_delta = most_improved_delta,
|
most_improved_delta = most_improved_delta,
|
||||||
most_declined_country = most_declined_country,
|
most_declined_country = most_declined_country,
|
||||||
most_declined_delta = most_declined_delta,
|
most_declined_delta = most_declined_delta,
|
||||||
|
historical_scores = score_by_year,
|
||||||
|
country_scores_all = country_scores_all,
|
||||||
)
|
)
|
||||||
|
|
||||||
records.append({
|
records.append({
|
||||||
@@ -1155,7 +1317,8 @@ class FoodSecurityAggregator:
|
|||||||
"most_improved_delta": most_improved_delta,
|
"most_improved_delta": most_improved_delta,
|
||||||
"most_declined_country": most_declined_country,
|
"most_declined_country": most_declined_country,
|
||||||
"most_declined_delta": most_declined_delta,
|
"most_declined_delta": most_declined_delta,
|
||||||
"narrative_overview": narrative,
|
"narrative_en": narrative_en,
|
||||||
|
"narrative_id": narrative_id,
|
||||||
})
|
})
|
||||||
|
|
||||||
df = pd.DataFrame(records)
|
df = pd.DataFrame(records)
|
||||||
@@ -1165,9 +1328,16 @@ class FoodSecurityAggregator:
|
|||||||
df["n_total_indicators"] = df["n_total_indicators"].astype(int)
|
df["n_total_indicators"] = df["n_total_indicators"].astype(int)
|
||||||
df["asean_total_score"] = df["asean_total_score"].astype(float)
|
df["asean_total_score"] = df["asean_total_score"].astype(float)
|
||||||
df["performance_status"] = df["performance_status"].astype(str)
|
df["performance_status"] = df["performance_status"].astype(str)
|
||||||
|
df["narrative_en"] = df["narrative_en"].astype(str)
|
||||||
|
df["narrative_id"] = df["narrative_id"].astype(str)
|
||||||
for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]:
|
for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]:
|
||||||
df[col] = pd.to_numeric(df[col], errors="coerce").astype(float)
|
df[col] = pd.to_numeric(df[col], errors="coerce").astype(float)
|
||||||
|
|
||||||
|
self.logger.info("\n Sample narrative_en (year 1):")
|
||||||
|
self.logger.info(f" {df.iloc[0]['narrative_en'][:300]}")
|
||||||
|
self.logger.info("\n Sample narrative_id (year 1):")
|
||||||
|
self.logger.info(f" {df.iloc[0]['narrative_id'][:300]}")
|
||||||
|
|
||||||
schema = [
|
schema = [
|
||||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||||
bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"),
|
||||||
@@ -1182,7 +1352,8 @@ class FoodSecurityAggregator:
|
|||||||
bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"),
|
bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"),
|
bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"),
|
bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"),
|
bigquery.SchemaField("narrative_en", "STRING", mode="REQUIRED"),
|
||||||
|
bigquery.SchemaField("narrative_id", "STRING", mode="REQUIRED"),
|
||||||
]
|
]
|
||||||
rows = load_to_bigquery(
|
rows = load_to_bigquery(
|
||||||
self.client, df, table_name, layer='gold',
|
self.client, df, table_name, layer='gold',
|
||||||
@@ -1208,12 +1379,20 @@ class FoodSecurityAggregator:
|
|||||||
self.load_metadata[table_name]["start_time"] = datetime.now()
|
self.load_metadata[table_name]["start_time"] = datetime.now()
|
||||||
self.logger.info("\n" + "=" * 70)
|
self.logger.info("\n" + "=" * 70)
|
||||||
self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold")
|
self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold")
|
||||||
|
self.logger.info(" Narrative: interpretatif, plain text, bilingual EN/ID")
|
||||||
self.logger.info("=" * 70)
|
self.logger.info("=" * 70)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
records = []
|
records = []
|
||||||
years = sorted(df_pillar_composite["year"].unique())
|
years = sorted(df_pillar_composite["year"].unique())
|
||||||
|
|
||||||
|
# Precompute history per pillar
|
||||||
|
pillar_history = {}
|
||||||
|
for p_id, grp in df_pillar_composite.groupby("pillar_id"):
|
||||||
|
pillar_history[p_id] = dict(
|
||||||
|
zip(grp["year"].astype(int), grp["pillar_score_1_100"].astype(float))
|
||||||
|
)
|
||||||
|
|
||||||
for yr in years:
|
for yr in years:
|
||||||
yr_pillars = (
|
yr_pillars = (
|
||||||
df_pillar_composite[df_pillar_composite["year"] == yr]
|
df_pillar_composite[df_pillar_composite["year"] == yr]
|
||||||
@@ -1222,21 +1401,6 @@ class FoodSecurityAggregator:
|
|||||||
)
|
)
|
||||||
yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr]
|
yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr]
|
||||||
|
|
||||||
strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None
|
|
||||||
weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None
|
|
||||||
|
|
||||||
yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"])
|
|
||||||
if not yr_pillars_yoy.empty:
|
|
||||||
best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax()
|
|
||||||
worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin()
|
|
||||||
most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"])
|
|
||||||
most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2)
|
|
||||||
most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"])
|
|
||||||
most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2)
|
|
||||||
else:
|
|
||||||
most_improved_pillar = most_declined_pillar = None
|
|
||||||
most_improved_delta = most_declined_delta = None
|
|
||||||
|
|
||||||
for _, prow in yr_pillars.iterrows():
|
for _, prow in yr_pillars.iterrows():
|
||||||
p_id = int(prow["pillar_id"])
|
p_id = int(prow["pillar_id"])
|
||||||
p_name = str(prow["pillar_name"])
|
p_name = str(prow["pillar_name"])
|
||||||
@@ -1259,7 +1423,17 @@ class FoodSecurityAggregator:
|
|||||||
top_country = bot_country = None
|
top_country = bot_country = None
|
||||||
top_country_score = bot_country_score = None
|
top_country_score = bot_country_score = None
|
||||||
|
|
||||||
narrative = _build_pillar_narrative(
|
# Data historis hanya sampai tahun ini
|
||||||
|
hist_up_to_yr = {
|
||||||
|
y: s for y, s in pillar_history.get(p_id, {}).items() if y <= yr
|
||||||
|
}
|
||||||
|
|
||||||
|
# Data negara-pilar ini semua tahun (untuk gap analysis)
|
||||||
|
country_pillar_all = df_pillar_by_country[
|
||||||
|
df_pillar_by_country["pillar_id"] == p_id
|
||||||
|
][["year", "country_id", "pillar_country_score_1_100"]].copy()
|
||||||
|
|
||||||
|
narrative_en, narrative_id = _build_pillar_narrative(
|
||||||
year = yr,
|
year = yr,
|
||||||
pillar_name = p_name,
|
pillar_name = p_name,
|
||||||
pillar_score = p_score,
|
pillar_score = p_score,
|
||||||
@@ -1270,14 +1444,9 @@ class FoodSecurityAggregator:
|
|||||||
top_country_score = top_country_score,
|
top_country_score = top_country_score,
|
||||||
bot_country = bot_country,
|
bot_country = bot_country,
|
||||||
bot_country_score = bot_country_score,
|
bot_country_score = bot_country_score,
|
||||||
strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None,
|
pillar_scores_history = hist_up_to_yr,
|
||||||
strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None,
|
all_pillar_scores_year= yr_pillars[["pillar_name", "pillar_score_1_100"]].copy(),
|
||||||
weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None,
|
country_pillar_all = country_pillar_all,
|
||||||
weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None,
|
|
||||||
most_improved_pillar = most_improved_pillar,
|
|
||||||
most_improved_delta = most_improved_delta,
|
|
||||||
most_declined_pillar = most_declined_pillar,
|
|
||||||
most_declined_delta = most_declined_delta,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
records.append({
|
records.append({
|
||||||
@@ -1291,16 +1460,24 @@ class FoodSecurityAggregator:
|
|||||||
"top_country_score": top_country_score,
|
"top_country_score": top_country_score,
|
||||||
"bottom_country": bot_country,
|
"bottom_country": bot_country,
|
||||||
"bottom_country_score": bot_country_score,
|
"bottom_country_score": bot_country_score,
|
||||||
"narrative_pillar": narrative,
|
"narrative_en": narrative_en,
|
||||||
|
"narrative_id": narrative_id,
|
||||||
})
|
})
|
||||||
|
|
||||||
df = pd.DataFrame(records)
|
df = pd.DataFrame(records)
|
||||||
df["year"] = df["year"].astype(int)
|
df["year"] = df["year"].astype(int)
|
||||||
df["pillar_id"] = df["pillar_id"].astype(int)
|
df["pillar_id"] = df["pillar_id"].astype(int)
|
||||||
df["rank_in_year"] = df["rank_in_year"].astype(int)
|
df["rank_in_year"] = df["rank_in_year"].astype(int)
|
||||||
|
df["narrative_en"] = df["narrative_en"].astype(str)
|
||||||
|
df["narrative_id"] = df["narrative_id"].astype(str)
|
||||||
for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]:
|
for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]:
|
||||||
df[col] = pd.to_numeric(df[col], errors="coerce").astype(float)
|
df[col] = pd.to_numeric(df[col], errors="coerce").astype(float)
|
||||||
|
|
||||||
|
self.logger.info("\n Sample narrative_en (first row):")
|
||||||
|
self.logger.info(f" {df.iloc[0]['narrative_en'][:300]}")
|
||||||
|
self.logger.info("\n Sample narrative_id (first row):")
|
||||||
|
self.logger.info(f" {df.iloc[0]['narrative_id'][:300]}")
|
||||||
|
|
||||||
schema = [
|
schema = [
|
||||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||||
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
|
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
|
||||||
@@ -1312,7 +1489,8 @@ class FoodSecurityAggregator:
|
|||||||
bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"),
|
bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"),
|
bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"),
|
bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"),
|
||||||
bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"),
|
bigquery.SchemaField("narrative_en", "STRING", mode="REQUIRED"),
|
||||||
|
bigquery.SchemaField("narrative_id", "STRING", mode="REQUIRED"),
|
||||||
]
|
]
|
||||||
rows = load_to_bigquery(
|
rows = load_to_bigquery(
|
||||||
self.client, df, table_name, layer='gold',
|
self.client, df, table_name, layer='gold',
|
||||||
@@ -1338,7 +1516,7 @@ class FoodSecurityAggregator:
|
|||||||
self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)")
|
self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)")
|
||||||
return
|
return
|
||||||
if mdgs_pre.empty or total_pre.empty:
|
if mdgs_pre.empty or total_pre.empty:
|
||||||
self.logger.warning(f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}")
|
self.logger.warning(f" -> [WARNING] Salah satu kosong")
|
||||||
return
|
return
|
||||||
check = mdgs_pre.merge(total_pre, on=group_by)
|
check = mdgs_pre.merge(total_pre, on=group_by)
|
||||||
max_diff = (check["mdgs_score"] - check["total_score"]).abs().max()
|
max_diff = (check["mdgs_score"] - check["total_score"]).abs().max()
|
||||||
@@ -1348,15 +1526,12 @@ class FoodSecurityAggregator:
|
|||||||
def _finalize(self, table_name: str, rows_loaded: int):
|
def _finalize(self, table_name: str, rows_loaded: int):
|
||||||
end_time = datetime.now()
|
end_time = datetime.now()
|
||||||
start_time = self.load_metadata[table_name].get("start_time")
|
start_time = self.load_metadata[table_name].get("start_time")
|
||||||
|
|
||||||
self.load_metadata[table_name].update({
|
self.load_metadata[table_name].update({
|
||||||
"rows_loaded": rows_loaded,
|
"rows_loaded": rows_loaded,
|
||||||
"status" : "success",
|
"status" : "success",
|
||||||
"end_time" : end_time,
|
"end_time" : end_time,
|
||||||
})
|
})
|
||||||
|
|
||||||
log_update(self.client, "DW", table_name, "full_load", rows_loaded)
|
log_update(self.client, "DW", table_name, "full_load", rows_loaded)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
save_etl_metadata(
|
save_etl_metadata(
|
||||||
self.client,
|
self.client,
|
||||||
@@ -1369,24 +1544,15 @@ class FoodSecurityAggregator:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
except Exception as meta_err:
|
except Exception as meta_err:
|
||||||
self.logger.warning(
|
self.logger.warning(f" [METADATA WARNING] {table_name}: {meta_err}")
|
||||||
f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
|
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
|
||||||
|
|
||||||
def _fail(self, table_name: str, error: Exception):
|
def _fail(self, table_name: str, error: Exception):
|
||||||
end_time = datetime.now()
|
end_time = datetime.now()
|
||||||
start_time = self.load_metadata[table_name].get("start_time")
|
start_time = self.load_metadata[table_name].get("start_time")
|
||||||
error_msg = str(error)
|
error_msg = str(error)
|
||||||
|
self.load_metadata[table_name].update({"status": "failed", "end_time": end_time})
|
||||||
self.load_metadata[table_name].update({
|
|
||||||
"status" : "failed",
|
|
||||||
"end_time": end_time,
|
|
||||||
})
|
|
||||||
|
|
||||||
log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg)
|
log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
save_etl_metadata(
|
save_etl_metadata(
|
||||||
self.client,
|
self.client,
|
||||||
@@ -1400,10 +1566,7 @@ class FoodSecurityAggregator:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
except Exception as meta_err:
|
except Exception as meta_err:
|
||||||
self.logger.warning(
|
self.logger.warning(f" [METADATA WARNING] {table_name}: {meta_err}")
|
||||||
f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.logger.error(f" [FAIL] {table_name}: {error_msg}")
|
self.logger.error(f" [FAIL] {table_name}: {error_msg}")
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
@@ -1414,11 +1577,8 @@ class FoodSecurityAggregator:
|
|||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
self.logger.info("\n" + "=" * 70)
|
self.logger.info("\n" + "=" * 70)
|
||||||
self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold")
|
self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold")
|
||||||
self.logger.info(" Source : fact_asean_food_security_selected")
|
self.logger.info(f" Performance threshold: {PERFORMANCE_THRESHOLD}")
|
||||||
self.logger.info(" Outputs : agg_pillar_composite | agg_pillar_by_country")
|
self.logger.info(f" Narrative style : interpretive, plain text, bilingual EN/ID")
|
||||||
self.logger.info(" agg_framework_by_country | agg_framework_asean")
|
|
||||||
self.logger.info(" agg_narrative_overview | agg_narrative_pillar")
|
|
||||||
self.logger.info(f" Performance threshold: {PERFORMANCE_THRESHOLD} (Good/Bad)")
|
|
||||||
self.logger.info("=" * 70)
|
self.logger.info("=" * 70)
|
||||||
|
|
||||||
self.load_data()
|
self.load_data()
|
||||||
@@ -1479,7 +1639,6 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
print("=" * 70)
|
print("=" * 70)
|
||||||
print("FOOD SECURITY AGGREGATION -> fs_asean_gold")
|
print("FOOD SECURITY AGGREGATION -> fs_asean_gold")
|
||||||
print(f" Source : fact_asean_food_security_selected")
|
|
||||||
print(f" NORMALIZE_FRAMEWORKS_JOINTLY : {NORMALIZE_FRAMEWORKS_JOINTLY}")
|
print(f" NORMALIZE_FRAMEWORKS_JOINTLY : {NORMALIZE_FRAMEWORKS_JOINTLY}")
|
||||||
print(f" PERFORMANCE_THRESHOLD : {PERFORMANCE_THRESHOLD}")
|
print(f" PERFORMANCE_THRESHOLD : {PERFORMANCE_THRESHOLD}")
|
||||||
print("=" * 70)
|
print("=" * 70)
|
||||||
|
|||||||
Reference in New Issue
Block a user