finish fact dan dim

This commit is contained in:
Debby
2026-04-02 20:31:19 +07:00
parent 47ea9c0492
commit d4bee86331
2 changed files with 153 additions and 141 deletions

View File

@@ -8,6 +8,8 @@ UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'):
- agg_framework_asean - agg_framework_asean
- agg_narrative_overview - agg_narrative_overview
- agg_narrative_pillar - agg_narrative_pillar
SOURCE TABLE: fact_asean_food_security_selected (sudah include nama + ID)
""" """
import pandas as pd import pandas as pd
@@ -166,18 +168,6 @@ def _build_overview_narrative(
most_declined_country, most_declined_country,
most_declined_delta, most_declined_delta,
) -> str: ) -> str:
"""
Compose a full English prose narrative for the Overview tab.
Narrative structure
-------------------
1. Indicator composition (MDGs first, then SDGs)
2. ASEAN score + YoY
3. Country ranking
4. Most improved / declined country
"""
# -- Sentence 1: indicator composition ----------------------------------
parts_ind = [] parts_ind = []
if n_mdg > 0: if n_mdg > 0:
parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}") parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}")
@@ -197,7 +187,6 @@ def _build_overview_narrative(
f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}." f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}."
) )
# -- Sentence 2: ASEAN score + YoY -------------------------------------
if yoy_val is not None and prev_score is not None: if yoy_val is not None and prev_score is not None:
direction_word = "increasing" if yoy_val >= 0 else "decreasing" direction_word = "increasing" if yoy_val >= 0 else "decreasing"
pct_clause = "" pct_clause = ""
@@ -216,7 +205,6 @@ def _build_overview_narrative(
f"no prior-year data is available for year-over-year comparison." f"no prior-year data is available for year-over-year comparison."
) )
# -- Sentence 3: country ranking ----------------------------
sent3 = "" sent3 = ""
if ranking_list: if ranking_list:
first = ranking_list[0] first = ranking_list[0]
@@ -236,7 +224,6 @@ def _build_overview_narrative(
f"{_fmt_score(last['score'])} in {year}." f"{_fmt_score(last['score'])} in {year}."
) )
else: else:
# Susun semua negara di tengah: "B (xx.xx), C (xx.xx), ..., and Y (xx.xx)"
middle_parts = [ middle_parts = [
f"{c['country_name']} ({_fmt_score(c['score'])})" f"{c['country_name']} ({_fmt_score(c['score'])})"
for c in middle for c in middle
@@ -253,7 +240,6 @@ def _build_overview_narrative(
f"of {_fmt_score(last['score'])} in {year}." f"of {_fmt_score(last['score'])} in {year}."
) )
# -- Sentence 4: most improved / declined ------------------------------
sent4_parts = [] sent4_parts = []
if most_improved_country and most_improved_delta is not None: if most_improved_country and most_improved_delta is not None:
sent4_parts.append( sent4_parts.append(
@@ -277,7 +263,6 @@ def _build_overview_narrative(
sent4 = ", ".join(sent4_parts) + "." sent4 = ", ".join(sent4_parts) + "."
sent4 = sent4[0].upper() + sent4[1:] sent4 = sent4[0].upper() + sent4[1:]
# -- Assemble ----------------------------------------------------------
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
@@ -301,25 +286,12 @@ def _build_pillar_narrative(
most_declined_pillar, most_declined_pillar,
most_declined_delta, most_declined_delta,
) -> str: ) -> str:
"""
Compose a full English prose narrative for a single pillar in a given year.
Narrative structure
-------------------
1. Pillar score and rank
2. Strongest / weakest pillar context
3. Top / bottom country within this pillar
4. YoY movement for this pillar + biggest mover across all pillars
"""
# -- Sentence 1: pillar overview ----------------------------------------
rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th")
sent1 = ( sent1 = (
f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, " f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, "
f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN." f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN."
) )
# -- Sentence 2: strongest / weakest context ----------------------------
sent2 = "" sent2 = ""
if strongest_pillar and weakest_pillar: if strongest_pillar and weakest_pillar:
if strongest_pillar == pillar_name: if strongest_pillar == pillar_name:
@@ -341,7 +313,6 @@ def _build_pillar_narrative(
f"was the weakest (score: {_fmt_score(weakest_score)})." f"was the weakest (score: {_fmt_score(weakest_score)})."
) )
# -- Sentence 3: country top / bottom within this pillar ---------------
sent3 = "" sent3 = ""
if top_country and bot_country: if top_country and bot_country:
if top_country != bot_country: if top_country != bot_country:
@@ -356,7 +327,6 @@ def _build_pillar_narrative(
f"with available data, scoring {_fmt_score(top_country_score)}." f"with available data, scoring {_fmt_score(top_country_score)}."
) )
# -- Sentence 4: YoY movement -------------------------------------------
if yoy_val is not None: if yoy_val is not None:
direction_word = "improved" if yoy_val >= 0 else "declined" direction_word = "improved" if yoy_val >= 0 else "declined"
sent4 = ( sent4 = (
@@ -381,7 +351,6 @@ def _build_pillar_narrative(
sent4 += "." sent4 += "."
sent4 = sent4[0].upper() + sent4[1:] sent4 = sent4[0].upper() + sent4[1:]
# -- Assemble ----------------------------------------------------------
return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) return " ".join(s for s in [sent1, sent2, sent3, sent4] if s)
@@ -421,33 +390,42 @@ class FoodSecurityAggregator:
self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") self.logger.info("STEP 1: LOAD DATA from fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
self.df = read_from_bigquery(self.client, "analytical_food_security", layer='gold') # -----------------------------------------------------------------------
self.logger.info(f" analytical_food_security : {len(self.df):,} rows") # CHANGED: sumber tabel -> fact_asean_food_security_selected
# Tabel ini sudah include: country_name, indicator_name, pillar_name,
self.dims["country"] = read_from_bigquery(self.client, "dim_country", layer='gold') # direction, year -> tidak perlu join ke dim_* lagi
self.dims["indicator"] = read_from_bigquery(self.client, "dim_indicator", layer='gold') # -----------------------------------------------------------------------
self.dims["pillar"] = read_from_bigquery(self.client, "dim_pillar", layer='gold') self.df = read_from_bigquery(
self.dims["time"] = read_from_bigquery(self.client, "dim_time", layer='gold') self.client, "fact_asean_food_security_selected", layer='gold'
ind_cols = ["indicator_id"]
if "direction" in self.dims["indicator"].columns:
ind_cols.append("direction")
self.df = (
self.df
.merge(self.dims["time"][["time_id", "year"]], on="time_id", how="left")
.merge(self.dims["country"][["country_id", "country_name"]], on="country_id", how="left")
.merge(self.dims["pillar"][["pillar_id", "pillar_name"]], on="pillar_id", how="left")
.merge(self.dims["indicator"][ind_cols], on="indicator_id", how="left")
) )
self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows")
if "direction" not in self.df.columns: # Validasi kolom wajib yang harus sudah ada di tabel baru
self.df["direction"] = "positive" required_cols = {
else: "country_id", "country_name",
n_null_dir = self.df["direction"].isna().sum() "indicator_id", "indicator_name", "direction",
if n_null_dir > 0: "pillar_id", "pillar_name",
self.logger.warning(f" [DIRECTION] {n_null_dir} rows dengan direction NULL -> diisi 'positive'") "time_id", "year",
self.df["direction"] = self.df["direction"].fillna("positive") "value",
}
missing_cols = required_cols - set(self.df.columns)
if missing_cols:
raise ValueError(
f"Kolom berikut tidak ditemukan di fact_asean_food_security_selected: "
f"{missing_cols}"
)
# -----------------------------------------------------------------------
# Tidak perlu join ke dim_* lagi karena semua nama sudah ada.
# Hanya load dim_indicator untuk keperluan fallback / referensi direction
# jika ada NULL yang perlu di-fill.
# -----------------------------------------------------------------------
n_null_dir = self.df["direction"].isna().sum()
if n_null_dir > 0:
self.logger.warning(
f" [DIRECTION] {n_null_dir} rows dengan direction NULL -> diisi 'positive'"
)
self.df["direction"] = self.df["direction"].fillna("positive")
dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts()
self.logger.info(f"\n Distribusi direction per indikator:") self.logger.info(f"\n Distribusi direction per indikator:")
@@ -455,10 +433,12 @@ class FoodSecurityAggregator:
tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal" tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal"
self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]") self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]")
self.logger.info(f"\n Setelah join: {len(self.df):,} rows") self.logger.info(f"\n Rows loaded : {len(self.df):,}")
self.logger.info(f" Negara : {self.df['country_id'].nunique()}") self.logger.info(f" Negara : {self.df['country_id'].nunique()}")
self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}") self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}")
self.logger.info(f" Tahun : {int(self.df['year'].min())} - {int(self.df['year'].max())}") self.logger.info(
f" Tahun : {int(self.df['year'].min())} - {int(self.df['year'].max())}"
)
# ========================================================================= # =========================================================================
# STEP 1b: Klasifikasi indikator ke MDGs / SDGs # STEP 1b: Klasifikasi indikator ke MDGs / SDGs
@@ -496,17 +476,26 @@ class FoodSecurityAggregator:
) )
sdgs_rows = ind_min_year[ind_min_year["framework"] == "SDGs"] sdgs_rows = ind_min_year[ind_min_year["framework"] == "SDGs"]
self.sdgs_start_year = int(sdgs_rows["min_year"].min()) if not sdgs_rows.empty else int(self.df["year"].max()) + 1 self.sdgs_start_year = (
int(sdgs_rows["min_year"].min()) if not sdgs_rows.empty
else int(self.df["year"].max()) + 1
)
self.logger.info(f" sdgs_start_year: {self.sdgs_start_year}") self.logger.info(f" sdgs_start_year: {self.sdgs_start_year}")
self.mdgs_indicator_ids = set(ind_min_year[ind_min_year["framework"] == "MDGs"]["indicator_id"].tolist()) self.mdgs_indicator_ids = set(
self.sdgs_indicator_ids = set(ind_min_year[ind_min_year["framework"] == "SDGs"]["indicator_id"].tolist()) ind_min_year[ind_min_year["framework"] == "MDGs"]["indicator_id"].tolist()
)
self.sdgs_indicator_ids = set(
ind_min_year[ind_min_year["framework"] == "SDGs"]["indicator_id"].tolist()
)
self.logger.info(f" MDGs: {len(self.mdgs_indicator_ids)} indicators") self.logger.info(f" MDGs: {len(self.mdgs_indicator_ids)} indicators")
self.logger.info(f" SDGs: {len(self.sdgs_indicator_ids)} indicators") self.logger.info(f" SDGs: {len(self.sdgs_indicator_ids)} indicators")
self.df = self.df.merge(ind_min_year[["indicator_id", "framework"]], on="indicator_id", how="left") self.df = self.df.merge(
ind_min_year[["indicator_id", "framework"]], on="indicator_id", how="left"
)
# ========================================================================= # =========================================================================
# CORE HELPER: normalisasi raw value per indikator # CORE HELPER: normalisasi raw value per indikator
@@ -514,7 +503,9 @@ class FoodSecurityAggregator:
def _get_norm_value_df(self) -> pd.DataFrame: def _get_norm_value_df(self) -> pd.DataFrame:
if "framework" not in self.df.columns: if "framework" not in self.df.columns:
raise ValueError("Kolom 'framework' tidak ada. Pastikan _classify_indicators() dipanggil lebih dulu.") raise ValueError(
"Kolom 'framework' tidak ada. Pastikan _classify_indicators() dipanggil lebih dulu."
)
norm_parts = [] norm_parts = []
for ind_id, grp in self.df.groupby("indicator_id"): for ind_id, grp in self.df.groupby("indicator_id"):
@@ -596,7 +587,10 @@ class FoodSecurityAggregator:
bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"),
] ]
rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) rows = load_to_bigquery(
self.client, df, table_name, layer='gold',
write_disposition="WRITE_TRUNCATE", schema=schema
)
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
@@ -646,7 +640,10 @@ class FoodSecurityAggregator:
bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"),
] ]
rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) rows = load_to_bigquery(
self.client, df, table_name, layer='gold',
write_disposition="WRITE_TRUNCATE", schema=schema
)
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
@@ -708,7 +705,10 @@ class FoodSecurityAggregator:
pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy()
if not pre_sdgs_rows.empty: if not pre_sdgs_rows.empty:
mdgs_pre = ( mdgs_pre = (
pre_sdgs_rows[["country_id", "country_name", "year", "score_1_100", "n_indicators", "composite_score"]] pre_sdgs_rows[[
"country_id", "country_name", "year",
"score_1_100", "n_indicators", "composite_score"
]]
.copy() .copy()
.rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"})
) )
@@ -786,7 +786,10 @@ class FoodSecurityAggregator:
bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"),
] ]
rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) rows = load_to_bigquery(
self.client, df, table_name, layer='gold',
write_disposition="WRITE_TRUNCATE", schema=schema
)
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
@@ -844,7 +847,11 @@ class FoodSecurityAggregator:
"asean_norm": "framework_norm", "asean_norm": "framework_norm",
"n_countries": "n_countries_with_data", "n_countries": "n_countries_with_data",
}) })
n_ind_pre = df_normed[df_normed["year"] < self.sdgs_start_year].groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) n_ind_pre = (
df_normed[df_normed["year"] < self.sdgs_start_year]
.groupby("year")["indicator_id"].nunique()
.reset_index().rename(columns={"indicator_id": "n_indicators"})
)
mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left")
mdgs_pre["framework"] = "MDGs" mdgs_pre["framework"] = "MDGs"
parts.append(mdgs_pre) parts.append(mdgs_pre)
@@ -917,19 +924,15 @@ class FoodSecurityAggregator:
bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"),
] ]
rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) rows = load_to_bigquery(
self.client, df, table_name, layer='gold',
write_disposition="WRITE_TRUNCATE", schema=schema
)
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
# ========================================================================= # =========================================================================
# STEP 6: agg_narrative_overview -> Gold (NEW) # STEP 6: agg_narrative_overview -> Gold
#
# Sumber data : df_framework_asean (framework='Total') + df_framework_by_country
# Granularity : 1 row per year
# Columns : year, n_mdg_indicators, n_sdg_indicators, n_total_indicators,
# asean_total_score, yoy_change, yoy_change_pct,
# country_ranking_json, most_improved_country, most_improved_delta,
# most_declined_country, most_declined_delta, narrative_overview
# ========================================================================= # =========================================================================
def calc_narrative_overview( def calc_narrative_overview(
@@ -943,28 +946,22 @@ class FoodSecurityAggregator:
self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold") self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
# ASEAN-level Total framework rows only, sorted by year
# PENTING: filter framework='Total' dulu sebelum apapun
asean_total = ( asean_total = (
df_framework_asean[df_framework_asean["framework"] == "Total"] df_framework_asean[df_framework_asean["framework"] == "Total"]
.sort_values("year") .sort_values("year")
.reset_index(drop=True) .reset_index(drop=True)
) )
# Buat lookup score per tahun untuk ambil prev_score yang akurat
# Tidak mengandalkan score - yoy_val karena floating point bisa drift
score_by_year = dict(zip( score_by_year = dict(zip(
asean_total["year"].astype(int), asean_total["year"].astype(int),
asean_total["framework_score_1_100"].astype(float), asean_total["framework_score_1_100"].astype(float),
)) ))
# Country-level Total framework rows (ranking + YoY per country)
country_total = ( country_total = (
df_framework_by_country[df_framework_by_country["framework"] == "Total"] df_framework_by_country[df_framework_by_country["framework"] == "Total"]
.copy() .copy()
) )
# Indicator counts per year per framework (self.df already has 'framework' column)
ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"])
records = [] records = []
@@ -975,24 +972,19 @@ class FoodSecurityAggregator:
yoy = row["year_over_year_change"] yoy = row["year_over_year_change"]
yoy_val = float(yoy) if pd.notna(yoy) else None yoy_val = float(yoy) if pd.notna(yoy) else None
# -- Indicator counts per framework for this year ---------------
yr_ind = ind_year[ind_year["year"] == yr] yr_ind = ind_year[ind_year["year"] == yr]
n_mdg = int(yr_ind[yr_ind["framework"] == "MDGs"]["indicator_id"].nunique()) n_mdg = int(yr_ind[yr_ind["framework"] == "MDGs"]["indicator_id"].nunique())
n_sdg = int(yr_ind[yr_ind["framework"] == "SDGs"]["indicator_id"].nunique()) n_sdg = int(yr_ind[yr_ind["framework"] == "SDGs"]["indicator_id"].nunique())
n_total_ind = int(yr_ind["indicator_id"].nunique()) n_total_ind = int(yr_ind["indicator_id"].nunique())
# -- prev_score diambil langsung dari lookup, bukan score - yoy_val
# Ini memastikan nilai konsisten 100% dengan tabel agg_framework_asean
prev_score = score_by_year.get(yr - 1, None) prev_score = score_by_year.get(yr - 1, None)
# -- YoY % -----------------------------------------------------
yoy_pct = ( yoy_pct = (
(yoy_val / prev_score * 100) (yoy_val / prev_score * 100)
if (yoy_val is not None and prev_score is not None and prev_score != 0) if (yoy_val is not None and prev_score is not None and prev_score != 0)
else None else None
) )
# -- Country ranking for this year -----------------------------
yr_country = ( yr_country = (
country_total[country_total["year"] == yr] country_total[country_total["year"] == yr]
.sort_values("rank_in_framework_year") .sort_values("rank_in_framework_year")
@@ -1010,7 +1002,6 @@ class FoodSecurityAggregator:
}) })
country_ranking_json = json.dumps(ranking_list, ensure_ascii=False) country_ranking_json = json.dumps(ranking_list, ensure_ascii=False)
# -- Most improved / declined country --------------------------
yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"])
if not yr_country_yoy.empty: if not yr_country_yoy.empty:
best_idx = yr_country_yoy["year_over_year_change"].idxmax() best_idx = yr_country_yoy["year_over_year_change"].idxmax()
@@ -1023,7 +1014,6 @@ class FoodSecurityAggregator:
most_improved_country = most_declined_country = None most_improved_country = most_declined_country = None
most_improved_delta = most_declined_delta = None most_improved_delta = most_declined_delta = None
# -- Build narrative -------------------------------------------
narrative = _build_overview_narrative( narrative = _build_overview_narrative(
year = yr, year = yr,
n_mdg = n_mdg, n_mdg = n_mdg,
@@ -1089,13 +1079,7 @@ class FoodSecurityAggregator:
return df return df
# ========================================================================= # =========================================================================
# STEP 7: agg_narrative_pillar -> Gold (NEW) # STEP 7: agg_narrative_pillar -> Gold
#
# Sumber data : df_pillar_composite + df_pillar_by_country
# Granularity : 1 row per (year, pillar_id)
# Columns : year, pillar_id, pillar_name, pillar_score, rank_in_year,
# yoy_change, top_country, top_country_score,
# bottom_country, bottom_country_score, narrative_pillar
# ========================================================================= # =========================================================================
def calc_narrative_pillar( def calc_narrative_pillar(
@@ -1120,11 +1104,9 @@ class FoodSecurityAggregator:
) )
yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr]
# Strongest / weakest pillar this year (for context sentence)
strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None
weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None
# Biggest improvement / decline across all pillars this year
yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"])
if not yr_pillars_yoy.empty: if not yr_pillars_yoy.empty:
best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax()
@@ -1145,7 +1127,6 @@ class FoodSecurityAggregator:
p_yoy = prow["year_over_year_change"] p_yoy = prow["year_over_year_change"]
p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None
# Top / bottom country within this pillar & year
p_country = ( p_country = (
yr_country_pillar[yr_country_pillar["pillar_id"] == p_id] yr_country_pillar[yr_country_pillar["pillar_id"] == p_id]
.sort_values("rank_in_pillar_year") .sort_values("rank_in_pillar_year")
@@ -1160,7 +1141,6 @@ class FoodSecurityAggregator:
top_country = bot_country = None top_country = bot_country = None
top_country_score = bot_country_score = None top_country_score = bot_country_score = None
# -- Build narrative ---------------------------------------
narrative = _build_pillar_narrative( narrative = _build_pillar_narrative(
year = yr, year = yr,
pillar_name = p_name, pillar_name = p_name,
@@ -1172,10 +1152,10 @@ class FoodSecurityAggregator:
top_country_score = top_country_score, top_country_score = top_country_score,
bot_country = bot_country, bot_country = bot_country,
bot_country_score = bot_country_score, bot_country_score = bot_country_score,
strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None,
strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None,
weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None,
weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None,
most_improved_pillar = most_improved_pillar, most_improved_pillar = most_improved_pillar,
most_improved_delta = most_improved_delta, most_improved_delta = most_improved_delta,
most_declined_pillar = most_declined_pillar, most_declined_pillar = most_declined_pillar,
@@ -1257,28 +1237,27 @@ class FoodSecurityAggregator:
log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error))
# ========================================================================= # =========================================================================
# RUN — 6 tabel (4 lama + 2 narrative baru) # RUN
# ========================================================================= # =========================================================================
def run(self): def run(self):
start = datetime.now() start = datetime.now()
self.logger.info("\n" + "=" * 70) self.logger.info("\n" + "=" * 70)
self.logger.info("FOOD SECURITY AGGREGATION v9.0 — 6 TABLES -> fs_asean_gold") self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold")
self.logger.info(" agg_pillar_composite | agg_pillar_by_country") self.logger.info(" Source : fact_asean_food_security_selected")
self.logger.info(" agg_framework_by_country| agg_framework_asean") self.logger.info(" Outputs : agg_pillar_composite | agg_pillar_by_country")
self.logger.info(" agg_narrative_overview | agg_narrative_pillar") self.logger.info(" agg_framework_by_country| agg_framework_asean")
self.logger.info(" agg_narrative_overview | agg_narrative_pillar")
self.logger.info("=" * 70) self.logger.info("=" * 70)
self.load_data() self.load_data()
self._classify_indicators() self._classify_indicators()
# -- 4 tabel lama (tidak ada perubahan) ----------------------------
df_pillar_composite = self.calc_pillar_composite() df_pillar_composite = self.calc_pillar_composite()
df_pillar_by_country = self.calc_pillar_by_country() df_pillar_by_country = self.calc_pillar_by_country()
df_framework_by_country = self.calc_framework_by_country() df_framework_by_country = self.calc_framework_by_country()
df_framework_asean = self.calc_framework_asean() df_framework_asean = self.calc_framework_asean()
# -- 2 tabel narrative baru ----------------------------------------
self.calc_narrative_overview( self.calc_narrative_overview(
df_framework_asean = df_framework_asean, df_framework_asean = df_framework_asean,
df_framework_by_country = df_framework_by_country, df_framework_by_country = df_framework_by_country,
@@ -1307,9 +1286,8 @@ class FoodSecurityAggregator:
def run_aggregation(): def run_aggregation():
""" """
Airflow task: Hitung semua agregasi dari analytical_food_security. Airflow task: Hitung semua agregasi dari fact_asean_food_security_selected.
Dipanggil setelah analytical_layer_to_gold selesai. Dipanggil setelah analytical_layer_to_gold selesai.
Menjalankan 6 tabel sekaligus: 4 agregasi + 2 narrative.
""" """
from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client() client = get_bigquery_client()
@@ -1332,8 +1310,9 @@ if __name__ == "__main__":
_sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace")
print("=" * 70) print("=" * 70)
print("FOOD SECURITY AGGREGATION-> fs_asean_gold") print("FOOD SECURITY AGGREGATION -> fs_asean_gold")
print(f" NORMALIZE_FRAMEWORKS_JOINTLY = {NORMALIZE_FRAMEWORKS_JOINTLY}") print(f" Source : fact_asean_food_security_selected")
print(f" NORMALIZE_FRAMEWORKS_JOINTLY : {NORMALIZE_FRAMEWORKS_JOINTLY}")
print("=" * 70) print("=" * 70)
logger = setup_logging() logger = setup_logging()

View File

@@ -1,6 +1,6 @@
""" """
BIGQUERY ANALYTICAL LAYER - DATA FILTERING BIGQUERY ANALYTICAL LAYER - DATA FILTERING
FIXED: analytical_food_security disimpan di fs_asean_gold (layer='gold') FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order: Filtering Order:
1. Load data (single years only) 1. Load data (single years only)
@@ -8,7 +8,7 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET) 4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries 5. Filter indicators with consistent presence across FIXED countries
6. Save analytical table (value only, normalisasi & direction handled downstream) 6. Save analytical table (dengan nama/label lengkap untuk Looker Studio)
""" """
import pandas as pd import pandas as pd
@@ -40,15 +40,15 @@ from google.cloud import bigquery
class AnalyticalLayerLoader: class AnalyticalLayerLoader:
""" """
Analytical Layer Loader for BigQuery - CORRECTED VERSION v4 Analytical Layer Loader for BigQuery
Key Logic: Key Logic:
1. Complete per country (no gaps from start_year to end_year) 1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars 2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years 3. Ensure indicators have consistent country count across all years
4. Save raw value only (normalisasi & direction handled downstream) 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio
Output: analytical_food_security -> DW layer (Gold) -> fs_asean_gold Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
""" """
def __init__(self, client: bigquery.Client): def __init__(self, client: bigquery.Client):
@@ -424,33 +424,65 @@ class AnalyticalLayerLoader:
return year_stats return year_stats
def save_analytical_table(self): def save_analytical_table(self):
table_name = 'analytical_food_security' # ---------------------------------------------------------------
# CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio
# ---------------------------------------------------------------
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# ------------------------------------------------------------------
# Pilih kolom: ID + Nama lengkap + value
# Kolom nama memudahkan filtering/slicing langsung di Looker Studio
# tanpa perlu join ulang ke tabel dimensi.
# ------------------------------------------------------------------
analytical_df = self.df_clean[[ analytical_df = self.df_clean[[
'country_id', 'indicator_id', 'pillar_id', 'time_id', 'value' 'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
]].copy() ]].copy()
analytical_df = analytical_df.sort_values( analytical_df = analytical_df.sort_values(
['time_id', 'country_id', 'indicator_id'] ['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True) ).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int) # Pastikan tipe data konsisten
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float) analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}") self.logger.info(f" Total rows: {len(analytical_df):,}")
# Schema BigQuery
schema = [ schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
] ]
rows_loaded = load_to_bigquery( rows_loaded = load_to_bigquery(
@@ -475,7 +507,8 @@ class AnalyticalLayerLoader:
'end_year' : self.end_year, 'end_year' : self.end_year,
'fixed_countries': len(self.selected_country_ids), 'fixed_countries': len(self.selected_country_ids),
'no_gaps' : True, 'no_gaps' : True,
'layer' : 'gold' 'layer' : 'gold',
'columns' : 'id + name + value (Looker Studio ready)'
}), }),
'validation_metrics' : json.dumps({ 'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
@@ -497,7 +530,7 @@ class AnalyticalLayerLoader:
self.pipeline_metadata['start_time'] = self.pipeline_start self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("Output: analytical_food_security → fs_asean_gold") self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_source_data() self.load_source_data()
@@ -528,7 +561,7 @@ class AnalyticalLayerLoader:
def run_analytical_layer(): def run_analytical_layer():
""" """
Airflow task: Build analytical_food_security dari fact_food_security + dims. Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai. Dipanggil setelah dimensional_model_to_gold selesai.
""" """
from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_config import get_bigquery_client
@@ -544,7 +577,7 @@ def run_analytical_layer():
if __name__ == "__main__": if __name__ == "__main__":
print("=" * 80) print("=" * 80)
print("Output: analytical_food_security → fs_asean_gold") print("Output: fact_asean_food_security_selected → fs_asean_gold")
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()