new narrative teks
This commit is contained in:
@@ -44,22 +44,24 @@ Output Schema (agg_indicator_norm):
|
|||||||
agg_narrative_indicator
|
agg_narrative_indicator
|
||||||
=============================================================================
|
=============================================================================
|
||||||
Tujuan:
|
Tujuan:
|
||||||
Menghasilkan narasi otomatis 1 paragraf per indikator per tahun di level ASEAN
|
Menghasilkan narasi otomatis 1 paragraf per indikator (level ASEAN,
|
||||||
(rata-rata seluruh negara ASEAN), dijalankan otomatis setelah agg_indicator_norm
|
merangkum seluruh periode + seluruh negara), dijalankan otomatis setelah
|
||||||
selesai dalam pipeline yang sama.
|
agg_indicator_norm selesai dalam pipeline yang sama.
|
||||||
|
|
||||||
Granularity:
|
Granularity:
|
||||||
year x indicator_id (level ASEAN, bukan per negara)
|
indicator_id (all years, all ASEAN countries)
|
||||||
|
|
||||||
Output Schema (agg_narrative_indicator):
|
Output Schema (agg_narrative_indicator):
|
||||||
year, indicator_id, indicator_name, unit, direction,
|
indicator_id, indicator_name, unit, direction,
|
||||||
pillar_name, framework,
|
pillar_name, framework,
|
||||||
avg_value, -- rata-rata value ASEAN
|
year_min, year_max, n_countries,
|
||||||
avg_norm_score_1_100, -- rata-rata norm_score_1_100 ASEAN
|
avg_value_first, avg_value_last,
|
||||||
|
avg_norm_score_1_100,
|
||||||
performance, -- Good | Bad | null
|
performance, -- Good | Bad | null
|
||||||
yoy_avg_value, -- perubahan avg_value vs tahun sebelumnya
|
n_yoy_total, n_yoy_positive,
|
||||||
n_countries, -- jumlah negara yang punya data tahun ini
|
best_yoy_from, best_yoy_to,
|
||||||
narrative -- 1 paragraf narasi otomatis
|
country_worst, country_best,
|
||||||
|
narrative
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@@ -114,10 +116,8 @@ SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
|||||||
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
||||||
])
|
])
|
||||||
|
|
||||||
# Lowercase set untuk matching case-insensitive
|
|
||||||
_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS)
|
_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS)
|
||||||
|
|
||||||
# FIES-specific keywords untuk deteksi sdgs_start_year
|
|
||||||
_FIES_DETECTION_KEYWORDS: frozenset = frozenset([
|
_FIES_DETECTION_KEYWORDS: frozenset = frozenset([
|
||||||
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
||||||
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
||||||
@@ -133,12 +133,11 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({
|
|||||||
"positive", "higher_better", "higher_is_better",
|
"positive", "higher_better", "higher_is_better",
|
||||||
})
|
})
|
||||||
|
|
||||||
# Threshold performance label
|
|
||||||
_PERFORMANCE_THRESHOLD: float = 60.0
|
_PERFORMANCE_THRESHOLD: float = 60.0
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# PURE HELPERS — agg_indicator_norm
|
# PURE HELPERS
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def _should_invert(direction: str, logger=None, context: str = "") -> bool:
|
def _should_invert(direction: str, logger=None, context: str = "") -> bool:
|
||||||
@@ -171,12 +170,8 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S
|
|||||||
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
|
def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
|
||||||
"""
|
"""
|
||||||
Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year.
|
Hitung YoY untuk satu grup (indicator_id, country_id) yang sudah di-sort by year.
|
||||||
|
Kolom yang ditambahkan: yoy_value, yoy_norm_value.
|
||||||
Kolom yang ditambahkan:
|
Baris pertama tiap grup selalu null.
|
||||||
yoy_value : value - value_prev
|
|
||||||
yoy_norm_value : norm_value - norm_value_prev
|
|
||||||
|
|
||||||
Baris pertama tiap grup selalu null (tidak ada tahun sebelumnya).
|
|
||||||
"""
|
"""
|
||||||
df = df.sort_values("year").copy()
|
df = df.sort_values("year").copy()
|
||||||
|
|
||||||
@@ -199,70 +194,22 @@ def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
|
|||||||
return df
|
return df
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# PURE HELPERS — agg_narrative_indicator
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
def _is_lower_better(direction: str) -> bool:
|
def _is_lower_better(direction: str) -> bool:
|
||||||
return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS
|
return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS
|
||||||
|
|
||||||
|
|
||||||
def _format_value(value: float, unit: str) -> str:
|
|
||||||
"""Format nilai dengan unit yang sesuai."""
|
|
||||||
if pd.isna(value):
|
|
||||||
return "N/A"
|
|
||||||
unit = str(unit).strip() if unit else ""
|
|
||||||
|
|
||||||
if abs(value) >= 1000:
|
|
||||||
formatted = f"{value:,.1f}"
|
|
||||||
elif abs(value) >= 10:
|
|
||||||
formatted = f"{value:.2f}"
|
|
||||||
else:
|
|
||||||
formatted = f"{value:.3f}"
|
|
||||||
|
|
||||||
return f"{formatted} {unit}".strip()
|
|
||||||
|
|
||||||
|
|
||||||
def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple:
|
|
||||||
"""
|
|
||||||
Kembalikan (direction_word, change_desc, is_positive_trend).
|
|
||||||
is_positive_trend: True jika perubahan menguntungkan sesuai direction.
|
|
||||||
"""
|
|
||||||
unit = str(unit).strip() if unit else ""
|
|
||||||
abs_yoy = abs(yoy)
|
|
||||||
|
|
||||||
if abs_yoy >= 1000:
|
|
||||||
yoy_str = f"{abs_yoy:,.1f}"
|
|
||||||
elif abs_yoy >= 10:
|
|
||||||
yoy_str = f"{abs_yoy:.2f}"
|
|
||||||
else:
|
|
||||||
yoy_str = f"{abs_yoy:.3f}"
|
|
||||||
|
|
||||||
change_desc = f"{yoy_str} {unit}".strip()
|
|
||||||
is_positive = (yoy < 0) if lower_better else (yoy > 0)
|
|
||||||
direction_word = "decreased by" if yoy < 0 else "increased by"
|
|
||||||
|
|
||||||
return direction_word, change_desc, is_positive
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# PURE HELPER — narrative builder (per indicator, all years, all countries)
|
# NARRATIVE BUILDER — agg_narrative_indicator
|
||||||
# ======================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def _build_narrative_per_indicator(row: pd.Series) -> str:
|
def _build_narrative_per_indicator(row: pd.Series) -> str:
|
||||||
"""
|
"""
|
||||||
Bangun 1 paragraf narasi ASEAN-level untuk satu indikator,
|
Narrative format (no em-dash, bold on key figures):
|
||||||
merangkum seluruh periode (year_min - year_max) dan seluruh negara.
|
**{indicator}** ({framework}, {pillar}): ASEAN average {rose/fell} from
|
||||||
|
**{first}** to **{last}** ({year_min} to {year_max}), **{improving/deteriorating}** trend.
|
||||||
Kolom yang dibutuhkan dari row:
|
Score: **{score}/100** (*{Good/Bad}*).
|
||||||
indicator_name, unit, direction, pillar_name, framework,
|
Best country: **{best}**; worst: **{worst}**.
|
||||||
year_min, year_max, n_countries,
|
Improved in **{n_pos}/{n_total}** YoY transitions.
|
||||||
avg_value_first, avg_value_last,
|
|
||||||
avg_norm_score_1_100, -- rata-rata seluruh periode
|
|
||||||
performance, -- Good | Bad | null
|
|
||||||
n_yoy_total, -- total transisi year-on-year
|
|
||||||
n_yoy_positive, -- jumlah transisi yang membaik
|
|
||||||
best_yoy_from, best_yoy_to, -- periode dengan perbaikan terbesar
|
|
||||||
country_worst, country_best -- negara dengan nilai terburuk / terbaik
|
|
||||||
"""
|
"""
|
||||||
ind_name = str(row["indicator_name"]).strip()
|
ind_name = str(row["indicator_name"]).strip()
|
||||||
unit = str(row["unit"]).strip() if row["unit"] else ""
|
unit = str(row["unit"]).strip() if row["unit"] else ""
|
||||||
@@ -287,118 +234,67 @@ def _build_narrative_per_indicator(row: pd.Series) -> str:
|
|||||||
country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None
|
country_worst = str(row["country_worst"]).strip() if not pd.isna(row["country_worst"]) else None
|
||||||
country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None
|
country_best = str(row["country_best"]).strip() if not pd.isna(row["country_best"]) else None
|
||||||
|
|
||||||
lower_better = _is_lower_better(direction)
|
lower_better = _is_lower_better(direction)
|
||||||
direction_label = (
|
|
||||||
"lower values indicate better outcomes"
|
|
||||||
if lower_better
|
|
||||||
else "higher values indicate better outcomes"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ---- Kalimat 1: Identifikasi indikator + cakupan -------------------------
|
def _fmt(v):
|
||||||
member_str = f"{n_countries} member state{'s' if n_countries > 1 else ''}"
|
if pd.isna(v):
|
||||||
sentence1 = (
|
return "N/A"
|
||||||
f"Across ASEAN, {ind_name} under the {framework} framework "
|
abs_v = abs(v)
|
||||||
f"({pillar} pillar) was monitored from {year_min} to {year_max} "
|
if abs_v >= 1000:
|
||||||
f"across {member_str}."
|
s = f"{v:,.1f}"
|
||||||
)
|
elif abs_v >= 10:
|
||||||
|
s = f"{v:.2f}"
|
||||||
|
else:
|
||||||
|
s = f"{v:.3f}"
|
||||||
|
return f"{s} {unit}".strip() if unit else s
|
||||||
|
|
||||||
# ---- Kalimat 2: Tren keseluruhan (first → last) --------------------------
|
# Sentence 1: trend first -> last
|
||||||
if not pd.isna(avg_first) and not pd.isna(avg_last):
|
if not pd.isna(avg_first) and not pd.isna(avg_last):
|
||||||
diff = avg_last - avg_first
|
diff = avg_last - avg_first
|
||||||
abs_diff = abs(diff)
|
|
||||||
|
|
||||||
# Format nilai
|
|
||||||
def fmt(v):
|
|
||||||
if abs(v) >= 1000:
|
|
||||||
return f"{v:,.1f}"
|
|
||||||
elif abs(v) >= 10:
|
|
||||||
return f"{v:.2f}"
|
|
||||||
else:
|
|
||||||
return f"{v:.3f}"
|
|
||||||
|
|
||||||
first_str = f"{fmt(avg_first)}{' ' + unit if unit else ''}"
|
|
||||||
last_str = f"{fmt(avg_last)}{' ' + unit if unit else ''}"
|
|
||||||
diff_str = f"{fmt(abs_diff)}{' ' + unit if unit else ''}"
|
|
||||||
|
|
||||||
# Apakah tren menguntungkan?
|
|
||||||
is_improving = (diff < 0) if lower_better else (diff > 0)
|
is_improving = (diff < 0) if lower_better else (diff > 0)
|
||||||
trend_word = "improving" if is_improving else "deteriorating"
|
trend_label = "improving" if is_improving else "deteriorating"
|
||||||
verb = "declining" if diff < 0 else "rising"
|
verb = "fell" if diff < 0 else "rose"
|
||||||
|
sent1 = (
|
||||||
sentence2 = (
|
f"**{ind_name}** ({framework}, {pillar}): ASEAN average {verb} from "
|
||||||
f"Since {direction_label}, the region collectively showed "
|
f"**{_fmt(avg_first)}** to **{_fmt(avg_last)}** ({year_min} to {year_max}), "
|
||||||
f"{'an' if trend_word[0] in 'aeiou' else 'a'} {trend_word} trend, "
|
f"**{trend_label}** trend."
|
||||||
f"with the ASEAN average {verb} from {first_str} in {year_min} "
|
|
||||||
f"to {last_str} in {year_max} "
|
|
||||||
f"(a cumulative {'reduction' if diff < 0 else 'increase'} of {diff_str})."
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
sentence2 = (
|
sent1 = (
|
||||||
f"Since {direction_label}, trend analysis could not be performed "
|
f"**{ind_name}** ({framework}, {pillar}): trend data unavailable "
|
||||||
f"due to missing data at the start or end of the period."
|
f"({year_min} to {year_max}, {n_countries} members)."
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- Kalimat 3: Score + performance -------------------------------------
|
# Sentence 2: score + performance
|
||||||
if not pd.isna(avg_score):
|
if not pd.isna(avg_score):
|
||||||
score_str = f"{avg_score:.1f} out of 100"
|
perf_label = f"*{performance}*" if performance in ("Good", "Bad") else ""
|
||||||
if performance == "Good":
|
sent2 = f"Score: **{avg_score:.1f}/100** {perf_label}.".strip()
|
||||||
sentence3 = (
|
|
||||||
f"The regional normalized score averaged {score_str} "
|
|
||||||
f"classified as Good performance."
|
|
||||||
)
|
|
||||||
elif performance == "Bad":
|
|
||||||
sentence3 = (
|
|
||||||
f"The regional normalized score averaged {score_str} "
|
|
||||||
f"classified as Bad performance, falling below the 60-point threshold."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sentence3 = (
|
|
||||||
f"The regional normalized score averaged {score_str}."
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
sentence3 = "The regional normalized performance score could not be assessed."
|
sent2 = "Score unavailable."
|
||||||
|
|
||||||
# ---- Kalimat 4: Negara terbaik & terburuk --------------------------------
|
# Sentence 3: best / worst country
|
||||||
if country_worst and country_best and country_worst != country_best:
|
if country_best and country_worst and country_best != country_worst:
|
||||||
if lower_better:
|
sent3 = f"Best country: **{country_best}**; worst: **{country_worst}**."
|
||||||
worst_label = "highest (most concerning)"
|
|
||||||
best_label = "consistently performed best (lowest values)"
|
|
||||||
else:
|
|
||||||
worst_label = "lowest (most concerning)"
|
|
||||||
best_label = "consistently performed best (highest values)"
|
|
||||||
|
|
||||||
sentence4 = (
|
|
||||||
f"Among member states, {country_worst} recorded the {worst_label} "
|
|
||||||
f"levels throughout the period, while {country_best} {best_label}."
|
|
||||||
)
|
|
||||||
elif country_best:
|
elif country_best:
|
||||||
sentence4 = (
|
sent3 = f"Best country: **{country_best}**."
|
||||||
f"Among member states, {country_best} consistently recorded the "
|
|
||||||
f"best performance throughout the period."
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
sentence4 = ""
|
sent3 = ""
|
||||||
|
|
||||||
# ---- Kalimat 5: YoY transitions -----------------------------------------
|
# Sentence 4: YoY transitions
|
||||||
if n_yoy_total > 0:
|
if n_yoy_total > 0:
|
||||||
yoy_sentence = (
|
best_period = ""
|
||||||
f"Year-on-year, the region improved in {n_yoy_positive} out of "
|
|
||||||
f"{n_yoy_total} transition{'s' if n_yoy_total > 1 else ''}"
|
|
||||||
)
|
|
||||||
if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to):
|
if not pd.isna(best_yoy_from) and not pd.isna(best_yoy_to):
|
||||||
yoy_sentence += (
|
best_period = f", best gain: **{int(best_yoy_from)} to {int(best_yoy_to)}**"
|
||||||
f", with the largest regional gain occurring between "
|
sent4 = (
|
||||||
f"{int(best_yoy_from)} and {int(best_yoy_to)}."
|
f"Improved in **{n_yoy_positive}/{n_yoy_total}** YoY transitions{best_period}."
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
yoy_sentence += "."
|
|
||||||
else:
|
else:
|
||||||
yoy_sentence = "Insufficient data to assess year-on-year transitions."
|
sent4 = "Insufficient data for YoY assessment."
|
||||||
|
|
||||||
parts = [sentence1, sentence2, sentence3]
|
parts = [sent1, sent2]
|
||||||
if sentence4:
|
if sent3:
|
||||||
parts.append(sentence4)
|
parts.append(sent3)
|
||||||
parts.append(yoy_sentence)
|
parts.append(sent4)
|
||||||
|
|
||||||
return " ".join(parts)
|
return " ".join(parts)
|
||||||
|
|
||||||
@@ -427,10 +323,10 @@ class IndicatorNormAggregator:
|
|||||||
11. Summary log agg_indicator_norm
|
11. Summary log agg_indicator_norm
|
||||||
|
|
||||||
Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada):
|
Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada):
|
||||||
12. Agregasi ke level ASEAN (year x indicator_id)
|
12. Agregasi ke level ASEAN per indicator_id
|
||||||
13. Hitung YoY avg_value per indikator
|
13. Hitung YoY avg_value per indikator
|
||||||
14. Assign performance berdasarkan avg_norm_score
|
14. Assign performance berdasarkan avg_norm_score
|
||||||
15. Build narrative 1 paragraf per baris
|
15. Build narrative 1 paragraf per indikator
|
||||||
16. Simpan ke BigQuery -> agg_narrative_indicator
|
16. Simpan ke BigQuery -> agg_narrative_indicator
|
||||||
17. Summary log agg_narrative_indicator
|
17. Summary log agg_narrative_indicator
|
||||||
"""
|
"""
|
||||||
@@ -769,11 +665,6 @@ class IndicatorNormAggregator:
|
|||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame:
|
def _assign_performance(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
"""
|
|
||||||
performance = "Good" jika norm_score_1_100 >= 60
|
|
||||||
= "Bad" jika norm_score_1_100 < 60
|
|
||||||
= null jika norm_score_1_100 null
|
|
||||||
"""
|
|
||||||
self.logger.info("\n" + "=" * 80)
|
self.logger.info("\n" + "=" * 80)
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f"STEP 9: ASSIGN PERFORMANCE LABEL "
|
f"STEP 9: ASSIGN PERFORMANCE LABEL "
|
||||||
@@ -832,7 +723,6 @@ class IndicatorNormAggregator:
|
|||||||
["year", "country_name", "pillar_name", "indicator_name"]
|
["year", "country_name", "pillar_name", "indicator_name"]
|
||||||
).reset_index(drop=True)
|
).reset_index(drop=True)
|
||||||
|
|
||||||
# Cast
|
|
||||||
out["year"] = out["year"].astype(int)
|
out["year"] = out["year"].astype(int)
|
||||||
out["country_id"] = out["country_id"].astype(int)
|
out["country_id"] = out["country_id"].astype(int)
|
||||||
out["country_name"] = out["country_name"].astype(str)
|
out["country_name"] = out["country_name"].astype(str)
|
||||||
@@ -949,7 +839,6 @@ class IndicatorNormAggregator:
|
|||||||
f"{r['avg_score']:.2f}"
|
f"{r['avg_score']:.2f}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Performance summary per framework
|
|
||||||
self.logger.info("\n Performance summary per Framework:")
|
self.logger.info("\n Performance summary per Framework:")
|
||||||
perf_fw = (
|
perf_fw = (
|
||||||
df[df["performance"].notna()]
|
df[df["performance"].notna()]
|
||||||
@@ -967,7 +856,6 @@ class IndicatorNormAggregator:
|
|||||||
f"({r['count']/total*100:.1f}%)"
|
f"({r['count']/total*100:.1f}%)"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Top 5 & Bottom 5 indikator
|
|
||||||
ind_avg = (
|
ind_avg = (
|
||||||
df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"])
|
df.groupby(["indicator_id", "indicator_name", "unit", "pillar_name", "direction"])
|
||||||
["norm_score_1_100"].mean()
|
["norm_score_1_100"].mean()
|
||||||
@@ -995,7 +883,6 @@ class IndicatorNormAggregator:
|
|||||||
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
|
f"{r['norm_score_1_100']:.2f} {tag} {unit}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Indikator per pillar
|
|
||||||
pillar_summary = (
|
pillar_summary = (
|
||||||
df.drop_duplicates(subset=["indicator_id", "pillar_name"])
|
df.drop_duplicates(subset=["indicator_id", "pillar_name"])
|
||||||
.groupby("pillar_name")["indicator_id"]
|
.groupby("pillar_name")["indicator_id"]
|
||||||
@@ -1008,29 +895,24 @@ class IndicatorNormAggregator:
|
|||||||
self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}")
|
self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}")
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# STEP 12-16: agg_narrative_indicator (lanjutan dari df_final)
|
# STEP 12-16: agg_narrative_indicator
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _build_narrative_table(self, df_final: pd.DataFrame):
|
def _build_narrative_table(self, df_final: pd.DataFrame):
|
||||||
"""
|
"""
|
||||||
Pipeline agg_narrative_indicator — granularity: per indicator_id (1 baris per indikator).
|
Pipeline agg_narrative_indicator.
|
||||||
Narasi merangkum seluruh periode + seluruh negara ASEAN.
|
Granularity: per indicator_id (1 baris per indikator, all years, all countries).
|
||||||
Dijalankan otomatis setelah agg_indicator_norm selesai.
|
|
||||||
"""
|
"""
|
||||||
self.logger.info("\n" + "=" * 80)
|
self.logger.info("\n" + "=" * 80)
|
||||||
self.logger.info("STEP 12-16: agg_narrative_indicator")
|
self.logger.info("STEP 12-16: agg_narrative_indicator")
|
||||||
self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)")
|
self.logger.info(" Level : per indicator_id (all years + all ASEAN countries)")
|
||||||
self.logger.info("=" * 80)
|
self.logger.info("=" * 80)
|
||||||
|
|
||||||
# -- STEP 12: Hitung statistik agregat per (indicator_id, country_id, year) --
|
|
||||||
self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---")
|
|
||||||
|
|
||||||
df = df_final.copy()
|
df = df_final.copy()
|
||||||
|
|
||||||
# Dimensi tetap per indikator
|
|
||||||
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
|
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
|
||||||
|
|
||||||
# ---- 12a. ASEAN avg per (indicator_id, year) -> untuk first/last & YoY ---
|
# ---- 12a. ASEAN avg per (indicator_id, year) -------------------------
|
||||||
|
self.logger.info("\n--- STEP 12: COMPUTE INDICATOR-LEVEL STATS ---")
|
||||||
df_yr = (
|
df_yr = (
|
||||||
df.groupby(["indicator_id", "year"])
|
df.groupby(["indicator_id", "year"])
|
||||||
.agg(
|
.agg(
|
||||||
@@ -1041,38 +923,35 @@ class IndicatorNormAggregator:
|
|||||||
.reset_index()
|
.reset_index()
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12b. first year / last year avg value per indikator -----------------
|
# ---- 12b. first / last avg value per indikator -----------------------
|
||||||
df_first = (
|
df_first = (
|
||||||
df_yr.sort_values("year")
|
df_yr.sort_values("year")
|
||||||
.groupby("indicator_id")
|
.groupby("indicator_id").first().reset_index()
|
||||||
.first()
|
[["indicator_id", "year", "avg_value"]]
|
||||||
.reset_index()[["indicator_id", "year", "avg_value"]]
|
|
||||||
.rename(columns={"year": "year_min", "avg_value": "avg_value_first"})
|
.rename(columns={"year": "year_min", "avg_value": "avg_value_first"})
|
||||||
)
|
)
|
||||||
df_last = (
|
df_last = (
|
||||||
df_yr.sort_values("year")
|
df_yr.sort_values("year")
|
||||||
.groupby("indicator_id")
|
.groupby("indicator_id").last().reset_index()
|
||||||
.last()
|
[["indicator_id", "year", "avg_value"]]
|
||||||
.reset_index()[["indicator_id", "year", "avg_value"]]
|
|
||||||
.rename(columns={"year": "year_max", "avg_value": "avg_value_last"})
|
.rename(columns={"year": "year_max", "avg_value": "avg_value_last"})
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12c. Rata-rata norm_score seluruh periode ----------------------------
|
# ---- 12c. Rata-rata norm_score seluruh periode -----------------------
|
||||||
df_score_avg = (
|
df_score_avg = (
|
||||||
df_yr.groupby("indicator_id")
|
df_yr.groupby("indicator_id")
|
||||||
.agg(avg_norm_score_1_100=("avg_norm_score", "mean"))
|
.agg(avg_norm_score_1_100=("avg_norm_score", "mean"))
|
||||||
.reset_index()
|
.reset_index()
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12d. n_countries: maks negara yang pernah hadir ---------------------
|
# ---- 12d. n_countries ------------------------------------------------
|
||||||
df_nc = (
|
df_nc = (
|
||||||
df.groupby("indicator_id")["country_id"]
|
df.groupby("indicator_id")["country_id"]
|
||||||
.nunique()
|
.nunique().reset_index()
|
||||||
.reset_index()
|
|
||||||
.rename(columns={"country_id": "n_countries"})
|
.rename(columns={"country_id": "n_countries"})
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12e. YoY per (indicator_id) di level ASEAN avg ----------------------
|
# ---- 12e. YoY per indicator (ASEAN avg) ------------------------------
|
||||||
self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---")
|
self.logger.info("\n--- STEP 13: COMPUTE YoY (ASEAN avg, per indicator) ---")
|
||||||
|
|
||||||
yoy_parts = []
|
yoy_parts = []
|
||||||
@@ -1088,7 +967,6 @@ class IndicatorNormAggregator:
|
|||||||
yoy_parts.append(grp)
|
yoy_parts.append(grp)
|
||||||
df_yr = pd.concat(yoy_parts, ignore_index=True)
|
df_yr = pd.concat(yoy_parts, ignore_index=True)
|
||||||
|
|
||||||
# Ambil direction per indikator untuk tentukan "improving"
|
|
||||||
dir_map = (
|
dir_map = (
|
||||||
df[["indicator_id", "direction"]]
|
df[["indicator_id", "direction"]]
|
||||||
.drop_duplicates(subset=["indicator_id"])
|
.drop_duplicates(subset=["indicator_id"])
|
||||||
@@ -1097,27 +975,20 @@ class IndicatorNormAggregator:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _is_positive_yoy(ind_id, yoy_val):
|
def _is_positive_yoy(ind_id, yoy_val):
|
||||||
"""True jika perubahan yoy menguntungkan sesuai direction."""
|
|
||||||
if pd.isna(yoy_val):
|
if pd.isna(yoy_val):
|
||||||
return False
|
return False
|
||||||
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
||||||
return (yoy_val < 0) if lb else (yoy_val > 0)
|
return (yoy_val < 0) if lb else (yoy_val > 0)
|
||||||
|
|
||||||
# Hitung n_yoy_total, n_yoy_positive, best_yoy
|
|
||||||
yoy_stats = []
|
yoy_stats = []
|
||||||
for ind_id, grp in df_yr.groupby("indicator_id"):
|
for ind_id, grp in df_yr.groupby("indicator_id"):
|
||||||
grp_yoy = grp[grp["yoy"].notna()].copy()
|
grp_yoy = grp[grp["yoy"].notna()].copy()
|
||||||
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
lb = _is_lower_better(dir_map.get(ind_id, "positive"))
|
||||||
|
|
||||||
n_total = len(grp_yoy)
|
n_total = len(grp_yoy)
|
||||||
n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"]))
|
n_positive = int(sum(_is_positive_yoy(ind_id, v) for v in grp_yoy["yoy"]))
|
||||||
|
|
||||||
# "Best" = perubahan paling menguntungkan
|
|
||||||
if n_total > 0:
|
if n_total > 0:
|
||||||
if lb:
|
idx_best = grp_yoy["yoy"].idxmin() if lb else grp_yoy["yoy"].idxmax()
|
||||||
idx_best = grp_yoy["yoy"].idxmin() # paling negatif = paling baik
|
|
||||||
else:
|
|
||||||
idx_best = grp_yoy["yoy"].idxmax() # paling positif = paling baik
|
|
||||||
best_row = grp_yoy.loc[idx_best]
|
best_row = grp_yoy.loc[idx_best]
|
||||||
best_yoy_from = best_row["year"] - 1
|
best_yoy_from = best_row["year"] - 1
|
||||||
best_yoy_to = best_row["year"]
|
best_yoy_to = best_row["year"]
|
||||||
@@ -1135,7 +1006,7 @@ class IndicatorNormAggregator:
|
|||||||
|
|
||||||
df_yoy_stats = pd.DataFrame(yoy_stats)
|
df_yoy_stats = pd.DataFrame(yoy_stats)
|
||||||
|
|
||||||
# ---- 12f. Country terbaik & terburuk (rata-rata value seluruh periode) ---
|
# ---- 12f. Country terbaik & terburuk ---------------------------------
|
||||||
df_country_avg = (
|
df_country_avg = (
|
||||||
df.groupby(["indicator_id", "country_id", "country_name"])
|
df.groupby(["indicator_id", "country_id", "country_name"])
|
||||||
.agg(country_avg_value=("value", "mean"))
|
.agg(country_avg_value=("value", "mean"))
|
||||||
@@ -1152,33 +1023,33 @@ class IndicatorNormAggregator:
|
|||||||
worst_row = grp.loc[grp["country_avg_value"].idxmin()]
|
worst_row = grp.loc[grp["country_avg_value"].idxmin()]
|
||||||
best_row = grp.loc[grp["country_avg_value"].idxmax()]
|
best_row = grp.loc[grp["country_avg_value"].idxmax()]
|
||||||
country_stats.append({
|
country_stats.append({
|
||||||
"indicator_id": ind_id,
|
"indicator_id" : ind_id,
|
||||||
"country_worst": worst_row["country_name"],
|
"country_worst": worst_row["country_name"],
|
||||||
"country_best" : best_row["country_name"],
|
"country_best" : best_row["country_name"],
|
||||||
})
|
})
|
||||||
df_country_stats = pd.DataFrame(country_stats)
|
df_country_stats = pd.DataFrame(country_stats)
|
||||||
|
|
||||||
# ---- 12g. Dimensi tetap per indikator ------------------------------------
|
# ---- 12g. Dimensi tetap per indikator --------------------------------
|
||||||
df_dim = (
|
df_dim = (
|
||||||
df[["indicator_id"] + dim_cols]
|
df[["indicator_id"] + dim_cols]
|
||||||
.drop_duplicates(subset=["indicator_id"])
|
.drop_duplicates(subset=["indicator_id"])
|
||||||
)
|
)
|
||||||
|
|
||||||
# ---- 12h. Merge semua -------------------------------------------------------
|
# ---- 12h. Merge semua ------------------------------------------------
|
||||||
df_agg = (
|
df_agg = (
|
||||||
df_dim
|
df_dim
|
||||||
.merge(df_first, on="indicator_id", how="left")
|
.merge(df_first, on="indicator_id", how="left")
|
||||||
.merge(df_last, on="indicator_id", how="left")
|
.merge(df_last, on="indicator_id", how="left")
|
||||||
.merge(df_score_avg, on="indicator_id", how="left")
|
.merge(df_score_avg, on="indicator_id", how="left")
|
||||||
.merge(df_nc, on="indicator_id", how="left")
|
.merge(df_nc, on="indicator_id", how="left")
|
||||||
.merge(df_yoy_stats, on="indicator_id", how="left")
|
.merge(df_yoy_stats, on="indicator_id", how="left")
|
||||||
.merge(df_country_stats,on="indicator_id", how="left")
|
.merge(df_country_stats, on="indicator_id", how="left")
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}")
|
self.logger.info(f" Rows (1 per indicator) : {len(df_agg):,}")
|
||||||
self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}")
|
self.logger.info(f" Indicators : {df_agg['indicator_id'].nunique()}")
|
||||||
|
|
||||||
# -- STEP 14: Assign performance --------------------------------------------
|
# -- STEP 14: Assign performance ---------------------------------------
|
||||||
self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---")
|
self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---")
|
||||||
df_agg["performance"] = pd.NA
|
df_agg["performance"] = pd.NA
|
||||||
has_score = df_agg["avg_norm_score_1_100"].notna()
|
has_score = df_agg["avg_norm_score_1_100"].notna()
|
||||||
@@ -1188,7 +1059,7 @@ class IndicatorNormAggregator:
|
|||||||
n_bad = (df_agg["performance"] == "Bad").sum()
|
n_bad = (df_agg["performance"] == "Bad").sum()
|
||||||
self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}")
|
self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}")
|
||||||
|
|
||||||
# -- STEP 15: Build narrative -----------------------------------------------
|
# -- STEP 15: Build narrative ------------------------------------------
|
||||||
self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---")
|
self.logger.info("\n--- STEP 15: BUILD NARRATIVE (per indicator, all years) ---")
|
||||||
df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1)
|
df_agg["narrative"] = df_agg.apply(_build_narrative_per_indicator, axis=1)
|
||||||
self.logger.info(f" Narratives generated: {len(df_agg):,}")
|
self.logger.info(f" Narratives generated: {len(df_agg):,}")
|
||||||
@@ -1199,7 +1070,7 @@ class IndicatorNormAggregator:
|
|||||||
f"\n -> {row['narrative'][:300]}..."
|
f"\n -> {row['narrative'][:300]}..."
|
||||||
)
|
)
|
||||||
|
|
||||||
# -- STEP 16: Save ----------------------------------------------------------
|
# -- STEP 16: Save -----------------------------------------------------
|
||||||
self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---")
|
self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---")
|
||||||
out = df_agg[[
|
out = df_agg[[
|
||||||
"indicator_id", "indicator_name", "unit", "direction",
|
"indicator_id", "indicator_name", "unit", "direction",
|
||||||
@@ -1215,7 +1086,6 @@ class IndicatorNormAggregator:
|
|||||||
|
|
||||||
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
|
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
|
||||||
|
|
||||||
# Cast
|
|
||||||
out["indicator_id"] = out["indicator_id"].astype(int)
|
out["indicator_id"] = out["indicator_id"].astype(int)
|
||||||
out["indicator_name"] = out["indicator_name"].astype(str)
|
out["indicator_name"] = out["indicator_name"].astype(str)
|
||||||
out["unit"] = out["unit"].fillna("").astype(str)
|
out["unit"] = out["unit"].fillna("").astype(str)
|
||||||
@@ -1296,7 +1166,6 @@ class IndicatorNormAggregator:
|
|||||||
|
|
||||||
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
|
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# RUN
|
# RUN
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
@@ -1326,7 +1195,6 @@ class IndicatorNormAggregator:
|
|||||||
self.pipeline_metadata["rows_loaded"] = rows_loaded
|
self.pipeline_metadata["rows_loaded"] = rows_loaded
|
||||||
self._log_summary(df_final)
|
self._log_summary(df_final)
|
||||||
|
|
||||||
# Lanjut build agg_narrative_indicator dari df_final (tanpa re-load BQ)
|
|
||||||
self._build_narrative_table(df_final)
|
self._build_narrative_table(df_final)
|
||||||
|
|
||||||
self.pipeline_metadata["end_time"] = datetime.now()
|
self.pipeline_metadata["end_time"] = datetime.now()
|
||||||
@@ -1345,7 +1213,7 @@ class IndicatorNormAggregator:
|
|||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# AIRFLOW TASK <-- tidak berubah
|
# AIRFLOW TASK
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def run_indicator_norm_aggregation():
|
def run_indicator_norm_aggregation():
|
||||||
|
|||||||
@@ -219,113 +219,58 @@ def _build_overview_narrative(
|
|||||||
most_declined_country,
|
most_declined_country,
|
||||||
most_declined_delta,
|
most_declined_delta,
|
||||||
) -> str:
|
) -> str:
|
||||||
# Sentence 1: indicator breakdown
|
"""
|
||||||
parts_ind = []
|
Narrative format (no em-dash):
|
||||||
|
In {year}, ASEAN scored {score} ({performance}) across {n_total} indicators
|
||||||
|
({n_mdg} MDGs, {n_sdg} SDGs). Score {increased/decreased} by {delta} pts from
|
||||||
|
{prev_year} ({prev_score}). {top_country} led the region; {bottom_country} ranked
|
||||||
|
last. Biggest gain: {country}; biggest drop: {country}.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Sentence 1: score + performance + indicators
|
||||||
|
ind_parts = []
|
||||||
if n_mdg > 0:
|
if n_mdg > 0:
|
||||||
parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}")
|
ind_parts.append(f"**{n_mdg} MDGs**")
|
||||||
if n_sdg > 0:
|
if n_sdg > 0:
|
||||||
parts_ind.append(f"{n_sdg} SDG indicator{'s' if n_sdg > 1 else ''}")
|
ind_parts.append(f"**{n_sdg} SDGs**")
|
||||||
|
ind_detail = f" ({', '.join(ind_parts)})" if ind_parts else ""
|
||||||
|
|
||||||
if parts_ind:
|
sent1 = (
|
||||||
ind_detail = " and ".join(parts_ind)
|
f"In **{year}**, ASEAN scored **{_fmt_score(score)}** (*{performance_status}*) "
|
||||||
sent1 = (
|
f"across **{n_total_ind} indicators**{ind_detail}."
|
||||||
f"In {year}, the ASEAN food security assessment incorporated a total of "
|
|
||||||
f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}, "
|
|
||||||
f"consisting of {ind_detail}."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sent1 = (
|
|
||||||
f"In {year}, the ASEAN food security assessment incorporated "
|
|
||||||
f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Sentence 2: score + performance status + YoY
|
|
||||||
status_phrase = (
|
|
||||||
f"classified as \"{performance_status}\" performance "
|
|
||||||
f"(threshold: {PERFORMANCE_THRESHOLD:.0f})"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Sentence 2: YoY
|
||||||
if yoy_val is not None and prev_score is not None:
|
if yoy_val is not None and prev_score is not None:
|
||||||
direction_word = "increasing" if yoy_val >= 0 else "decreasing"
|
direction_word = "increased" if yoy_val >= 0 else "decreased"
|
||||||
pct_clause = ""
|
|
||||||
if yoy_pct is not None:
|
|
||||||
abs_pct = abs(yoy_pct)
|
|
||||||
trend_word = "improvement" if yoy_val >= 0 else "decline"
|
|
||||||
pct_clause = f", representing a {abs_pct:.2f}% {trend_word} year-over-year"
|
|
||||||
|
|
||||||
status_change = ""
|
|
||||||
if prev_performance_status not in ("N/A", None) and prev_performance_status != performance_status:
|
|
||||||
status_change = (
|
|
||||||
f" This marks a shift from \"{prev_performance_status}\" in {prev_year} "
|
|
||||||
f"to \"{performance_status}\" in {year}."
|
|
||||||
)
|
|
||||||
|
|
||||||
sent2 = (
|
sent2 = (
|
||||||
f"The ASEAN overall score (Total framework) reached {_fmt_score(score)}, "
|
f"Score {direction_word} by **{abs(yoy_val):.2f} pts** "
|
||||||
f"{status_phrase}, {direction_word} by {abs(yoy_val):.2f} points compared to "
|
f"from {prev_year} ({_fmt_score(prev_score)}, *{prev_performance_status}*)."
|
||||||
f"{prev_year} ({_fmt_score(prev_score)}, \"{prev_performance_status}\"){pct_clause}.{status_change}"
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
sent2 = (
|
sent2 = "No prior-year data available for comparison."
|
||||||
f"The ASEAN overall score (Total framework) reached {_fmt_score(score)} in {year}, "
|
|
||||||
f"{status_phrase}. No prior-year data is available for year-over-year comparison."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Sentence 3: country ranking
|
# Sentence 3: country ranking
|
||||||
sent3 = ""
|
sent3 = ""
|
||||||
if ranking_list:
|
if ranking_list:
|
||||||
first = ranking_list[0]
|
first = ranking_list[0]
|
||||||
last = ranking_list[-1]
|
last = ranking_list[-1]
|
||||||
middle = ranking_list[1:-1]
|
|
||||||
|
|
||||||
if len(ranking_list) == 1:
|
if len(ranking_list) == 1:
|
||||||
sent3 = (
|
sent3 = f"**{first['country_name']}** was the only country assessed ({_fmt_score(first['score'])})."
|
||||||
f"In terms of country performance, {first['country_name']} was the only "
|
|
||||||
f"country assessed, scoring {_fmt_score(first['score'])} in {year}."
|
|
||||||
)
|
|
||||||
elif len(ranking_list) == 2:
|
|
||||||
sent3 = (
|
|
||||||
f"In terms of country performance, {first['country_name']} led the region "
|
|
||||||
f"with a score of {_fmt_score(first['score'])}, while "
|
|
||||||
f"{last['country_name']} recorded the lowest score of "
|
|
||||||
f"{_fmt_score(last['score'])} in {year}."
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
middle_parts = [
|
|
||||||
f"{c['country_name']} ({_fmt_score(c['score'])})" for c in middle
|
|
||||||
]
|
|
||||||
if len(middle_parts) == 1:
|
|
||||||
middle_str = middle_parts[0]
|
|
||||||
else:
|
|
||||||
middle_str = ", ".join(middle_parts[:-1]) + f", and {middle_parts[-1]}"
|
|
||||||
sent3 = (
|
sent3 = (
|
||||||
f"In terms of country performance, {first['country_name']} led the region "
|
f"**{first['country_name']}** led the region ({_fmt_score(first['score'])}); "
|
||||||
f"with a score of {_fmt_score(first['score'])}, followed by {middle_str}. "
|
f"**{last['country_name']}** ranked last ({_fmt_score(last['score'])})."
|
||||||
f"At the other end, {last['country_name']} recorded the lowest score "
|
|
||||||
f"of {_fmt_score(last['score'])} in {year}."
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Sentence 4: most improved / declined country
|
# Sentence 4: most improved / declined
|
||||||
sent4_parts = []
|
sent4_parts = []
|
||||||
if most_improved_country and most_improved_delta is not None:
|
if most_improved_country and most_improved_delta is not None:
|
||||||
sent4_parts.append(
|
sent4_parts.append(f"Biggest gain: **{most_improved_country}** ({_fmt_delta(most_improved_delta)} pts)")
|
||||||
f"the most notable improvement was seen in {most_improved_country}, "
|
|
||||||
f"which gained {_fmt_delta(most_improved_delta)} points from the previous year"
|
|
||||||
)
|
|
||||||
if most_declined_country and most_declined_delta is not None:
|
if most_declined_country and most_declined_delta is not None:
|
||||||
if most_declined_delta < 0:
|
sent4_parts.append(f"biggest drop: **{most_declined_country}** ({_fmt_delta(most_declined_delta)} pts)")
|
||||||
sent4_parts.append(
|
sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else ""
|
||||||
f"while {most_declined_country} experienced the largest decline "
|
if sent4:
|
||||||
f"of {_fmt_delta(most_declined_delta)} points"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sent4_parts.append(
|
|
||||||
f"while {most_declined_country} recorded the smallest gain "
|
|
||||||
f"of {_fmt_delta(most_declined_delta)} points"
|
|
||||||
)
|
|
||||||
|
|
||||||
sent4 = ""
|
|
||||||
if sent4_parts:
|
|
||||||
sent4 = ", ".join(sent4_parts) + "."
|
|
||||||
sent4 = sent4[0].upper() + sent4[1:]
|
sent4 = sent4[0].upper() + sent4[1:]
|
||||||
|
|
||||||
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
|
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
|
||||||
@@ -351,70 +296,55 @@ def _build_pillar_narrative(
|
|||||||
most_declined_pillar,
|
most_declined_pillar,
|
||||||
most_declined_delta,
|
most_declined_delta,
|
||||||
) -> str:
|
) -> str:
|
||||||
|
"""
|
||||||
|
Narrative format (no em-dash):
|
||||||
|
In {year}, {pillar} ranked {rank}/{n} with score {score}, {up/down} {delta} pts YoY.
|
||||||
|
Top country: {top_country}; bottom: {bot_country}.
|
||||||
|
Strongest pillar: {pillar}; weakest: {pillar}.
|
||||||
|
"""
|
||||||
|
|
||||||
rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th")
|
rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th")
|
||||||
|
|
||||||
|
# Sentence 1: rank + score + YoY
|
||||||
|
if yoy_val is not None:
|
||||||
|
direction_word = "up" if yoy_val >= 0 else "down"
|
||||||
|
yoy_clause = f", {direction_word} **{abs(yoy_val):.2f} pts** YoY"
|
||||||
|
else:
|
||||||
|
yoy_clause = ", no prior-year data"
|
||||||
|
|
||||||
sent1 = (
|
sent1 = (
|
||||||
f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, "
|
f"In **{year}**, **{pillar_name}** ranked **{rank_in_year}{rank_suffix}/{n_pillars}** "
|
||||||
f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN."
|
f"with score **{_fmt_score(pillar_score)}**{yoy_clause}."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Sentence 2: top / bottom country
|
||||||
sent2 = ""
|
sent2 = ""
|
||||||
if strongest_pillar and weakest_pillar:
|
|
||||||
if strongest_pillar == pillar_name:
|
|
||||||
sent2 = (
|
|
||||||
f"This made {pillar_name} the strongest performing pillar in {year}, "
|
|
||||||
f"compared to the weakest pillar, {weakest_pillar}, "
|
|
||||||
f"which scored {_fmt_score(weakest_score)}."
|
|
||||||
)
|
|
||||||
elif weakest_pillar == pillar_name:
|
|
||||||
sent2 = (
|
|
||||||
f"This made {pillar_name} the weakest performing pillar in {year}, "
|
|
||||||
f"compared to the strongest pillar, {strongest_pillar}, "
|
|
||||||
f"which scored {_fmt_score(strongest_score)}."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sent2 = (
|
|
||||||
f"Across all pillars in {year}, {strongest_pillar} was the strongest "
|
|
||||||
f"(score: {_fmt_score(strongest_score)}), while {weakest_pillar} "
|
|
||||||
f"was the weakest (score: {_fmt_score(weakest_score)})."
|
|
||||||
)
|
|
||||||
|
|
||||||
sent3 = ""
|
|
||||||
if top_country and bot_country:
|
if top_country and bot_country:
|
||||||
if top_country != bot_country:
|
if top_country != bot_country:
|
||||||
sent3 = (
|
sent2 = (
|
||||||
f"Within the {pillar_name} pillar, {top_country} led with a score of "
|
f"Top country: **{top_country}** ({_fmt_score(top_country_score)}); "
|
||||||
f"{_fmt_score(top_country_score)}, while {bot_country} recorded the lowest "
|
f"bottom: **{bot_country}** ({_fmt_score(bot_country_score)})."
|
||||||
f"score of {_fmt_score(bot_country_score)}."
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
sent3 = (
|
sent2 = f"**{top_country}** was the only country with data ({_fmt_score(top_country_score)})."
|
||||||
f"Within the {pillar_name} pillar, {top_country} was the only country "
|
|
||||||
f"with available data, scoring {_fmt_score(top_country_score)}."
|
|
||||||
)
|
|
||||||
|
|
||||||
if yoy_val is not None:
|
# Sentence 3: strongest / weakest pillar
|
||||||
direction_word = "improved" if yoy_val >= 0 else "declined"
|
sent3 = ""
|
||||||
sent4 = (
|
if strongest_pillar and weakest_pillar:
|
||||||
f"Compared to the previous year, the {pillar_name} pillar "
|
sent3 = (
|
||||||
f"{direction_word} by {abs(yoy_val):.2f} points"
|
f"Strongest pillar: **{strongest_pillar}** ({_fmt_score(strongest_score)}); "
|
||||||
)
|
f"weakest: **{weakest_pillar}** ({_fmt_score(weakest_score)})."
|
||||||
else:
|
|
||||||
sent4 = (
|
|
||||||
f"No prior-year data is available to calculate year-over-year change "
|
|
||||||
f"for the {pillar_name} pillar in {year}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if (most_improved_pillar and most_improved_delta is not None
|
# Sentence 4: most improved / declined pillar
|
||||||
and most_declined_pillar and most_declined_delta is not None
|
sent4_parts = []
|
||||||
and most_improved_pillar != most_declined_pillar):
|
if most_improved_pillar and most_improved_delta is not None:
|
||||||
sent4 += (
|
sent4_parts.append(f"Best gain: **{most_improved_pillar}** ({_fmt_delta(most_improved_delta)} pts)")
|
||||||
f". Across all pillars, {most_improved_pillar} showed the greatest improvement "
|
if most_declined_pillar and most_declined_delta is not None:
|
||||||
f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} "
|
sent4_parts.append(f"largest drop: **{most_declined_pillar}** ({_fmt_delta(most_declined_delta)} pts)")
|
||||||
f"recorded the largest decline ({_fmt_delta(most_declined_delta)} pts)"
|
sent4 = ("; ".join(sent4_parts) + ".") if sent4_parts else ""
|
||||||
)
|
if sent4:
|
||||||
|
sent4 = sent4[0].upper() + sent4[1:]
|
||||||
sent4 += "."
|
|
||||||
sent4 = sent4[0].upper() + sent4[1:]
|
|
||||||
|
|
||||||
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
|
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
|
||||||
|
|
||||||
@@ -610,10 +540,6 @@ class FoodSecurityAggregator:
|
|||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# METADATA BUILDER
|
# METADATA BUILDER
|
||||||
# Menyesuaikan dengan signature: save_etl_metadata(client, metadata: dict)
|
|
||||||
# dan skema etl_metadata: source_class, table_name, execution_timestamp,
|
|
||||||
# duration_seconds, rows_fetched, rows_transformed, rows_loaded,
|
|
||||||
# completeness_pct, config_snapshot, validation_metrics
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
|
|
||||||
def _build_etl_metadata(
|
def _build_etl_metadata(
|
||||||
@@ -1419,50 +1345,7 @@ class FoodSecurityAggregator:
|
|||||||
status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}"
|
status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}"
|
||||||
self.logger.info(f" -> {status} (n_checked={len(check)})")
|
self.logger.info(f" -> {status} (n_checked={len(check)})")
|
||||||
|
|
||||||
def _build_etl_metadata(
|
|
||||||
self,
|
|
||||||
table_name: str,
|
|
||||||
rows_loaded: int,
|
|
||||||
start_time: datetime,
|
|
||||||
end_time: datetime,
|
|
||||||
status: str,
|
|
||||||
error_msg: str = None,
|
|
||||||
) -> dict:
|
|
||||||
"""
|
|
||||||
Susun dict metadata sesuai signature save_etl_metadata(client, metadata: dict)
|
|
||||||
dan kolom skema etl_metadata di bigquery_helpers.py:
|
|
||||||
source_class, table_name, execution_timestamp, duration_seconds,
|
|
||||||
rows_fetched, rows_transformed, rows_loaded, completeness_pct,
|
|
||||||
config_snapshot, validation_metrics
|
|
||||||
"""
|
|
||||||
duration = (end_time - start_time).total_seconds() if (start_time and end_time) else 0.0
|
|
||||||
return {
|
|
||||||
"source_class" : "FoodSecurityAggregator",
|
|
||||||
"table_name" : table_name,
|
|
||||||
"execution_timestamp": start_time or end_time,
|
|
||||||
"duration_seconds" : round(duration, 4),
|
|
||||||
"rows_fetched" : rows_loaded,
|
|
||||||
"rows_transformed" : rows_loaded,
|
|
||||||
"rows_loaded" : rows_loaded,
|
|
||||||
"completeness_pct" : 100.0 if status == "success" else 0.0,
|
|
||||||
"config_snapshot" : json.dumps({
|
|
||||||
"layer" : "gold",
|
|
||||||
"write_disposition" : "WRITE_TRUNCATE",
|
|
||||||
"normalize_frameworks_jointly": NORMALIZE_FRAMEWORKS_JOINTLY,
|
|
||||||
"performance_threshold" : PERFORMANCE_THRESHOLD,
|
|
||||||
"status" : status,
|
|
||||||
}),
|
|
||||||
"validation_metrics" : json.dumps({
|
|
||||||
"status" : status,
|
|
||||||
"error_msg": error_msg or "",
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _finalize(self, table_name: str, rows_loaded: int):
|
def _finalize(self, table_name: str, rows_loaded: int):
|
||||||
"""
|
|
||||||
Tandai tabel sukses. Catat ke etl_logs dan etl_metadata.
|
|
||||||
Pemanggilan: save_etl_metadata(client, metadata_dict)
|
|
||||||
"""
|
|
||||||
end_time = datetime.now()
|
end_time = datetime.now()
|
||||||
start_time = self.load_metadata[table_name].get("start_time")
|
start_time = self.load_metadata[table_name].get("start_time")
|
||||||
|
|
||||||
@@ -1486,7 +1369,6 @@ class FoodSecurityAggregator:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
except Exception as meta_err:
|
except Exception as meta_err:
|
||||||
# Error metadata tidak boleh menghentikan pipeline
|
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}"
|
f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}"
|
||||||
)
|
)
|
||||||
@@ -1494,10 +1376,6 @@ class FoodSecurityAggregator:
|
|||||||
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
|
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
|
||||||
|
|
||||||
def _fail(self, table_name: str, error: Exception):
|
def _fail(self, table_name: str, error: Exception):
|
||||||
"""
|
|
||||||
Tandai tabel gagal. Catat ke etl_logs dan etl_metadata.
|
|
||||||
Pemanggilan: save_etl_metadata(client, metadata_dict)
|
|
||||||
"""
|
|
||||||
end_time = datetime.now()
|
end_time = datetime.now()
|
||||||
start_time = self.load_metadata[table_name].get("start_time")
|
start_time = self.load_metadata[table_name].get("start_time")
|
||||||
error_msg = str(error)
|
error_msg = str(error)
|
||||||
@@ -1579,10 +1457,6 @@ class FoodSecurityAggregator:
|
|||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def run_aggregation():
|
def run_aggregation():
|
||||||
"""
|
|
||||||
Airflow task: Hitung semua agregasi dari fact_asean_food_security_selected.
|
|
||||||
Dipanggil setelah analytical_layer_to_gold selesai.
|
|
||||||
"""
|
|
||||||
from scripts.bigquery_config import get_bigquery_client
|
from scripts.bigquery_config import get_bigquery_client
|
||||||
client = get_bigquery_client()
|
client = get_bigquery_client()
|
||||||
agg = FoodSecurityAggregator(client)
|
agg = FoodSecurityAggregator(client)
|
||||||
|
|||||||
Reference in New Issue
Block a user