From 1d732167f5d0aa3a5190f538283fef790e077d0a Mon Sep 17 00:00:00 2001 From: Debby Date: Thu, 26 Mar 2026 17:27:59 +0700 Subject: [PATCH] create naration --- scripts/bigquery_aggregate_layer.py | 566 +++++++++++++++++++++++++++- 1 file changed, 558 insertions(+), 8 deletions(-) diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index 15ed0a0..cbb97aa 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -1,11 +1,13 @@ """ BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION Semua agregasi pakai norm_value dari _get_norm_value_df() -FIXED: Hanya simpan 4 tabel ke fs_asean_gold (layer='gold'): +UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'): - agg_pillar_composite - agg_pillar_by_country - agg_framework_by_country - agg_framework_asean + - agg_narrative_overview (NEW) + - agg_narrative_pillar (NEW) """ import pandas as pd @@ -129,6 +131,245 @@ def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger= return df +# ============================================================================= +# NARRATIVE BUILDER FUNCTIONS (pure functions, easy to unit-test) +# ============================================================================= + +def _fmt_score(score) -> str: + """Format score to 2 decimal places.""" + if score is None or (isinstance(score, float) and np.isnan(score)): + return "N/A" + return f"{score:.2f}" + + +def _fmt_delta(delta) -> str: + """Format YoY delta with sign and 2 decimal places.""" + if delta is None or (isinstance(delta, float) and np.isnan(delta)): + return "N/A" + sign = "+" if delta >= 0 else "" + return f"{sign}{delta:.2f}" + + +def _build_overview_narrative( + year: int, + n_mdg: int, + n_sdg: int, + n_total_ind: int, + score: float, + yoy_val, + yoy_pct, + prev_year: int, + prev_score, + ranking_list: list, + most_improved_country, + most_improved_delta, + most_declined_country, + most_declined_delta, +) -> str: + """ + Compose a full English prose narrative for the Overview tab. + + Narrative structure + ------------------- + 1. Indicator composition (MDGs first, then SDGs) + 2. ASEAN score + YoY + 3. Country ranking + 4. Most improved / declined country + """ + + # -- Sentence 1: indicator composition ---------------------------------- + parts_ind = [] + if n_mdg > 0: + parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}") + if n_sdg > 0: + parts_ind.append(f"{n_sdg} SDG indicator{'s' if n_sdg > 1 else ''}") + + if parts_ind: + ind_detail = " and ".join(parts_ind) + sent1 = ( + f"In {year}, the ASEAN food security assessment incorporated a total of " + f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}, " + f"consisting of {ind_detail}." + ) + else: + sent1 = ( + f"In {year}, the ASEAN food security assessment incorporated " + f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}." + ) + + # -- Sentence 2: ASEAN score + YoY ------------------------------------- + if yoy_val is not None and prev_score is not None: + direction_word = "increasing" if yoy_val >= 0 else "decreasing" + pct_clause = "" + if yoy_pct is not None: + abs_pct = abs(yoy_pct) + trend_word = "improvement" if yoy_val >= 0 else "decline" + pct_clause = f", which represents a {abs_pct:.2f}% {trend_word} year-over-year" + sent2 = ( + f"The ASEAN overall score (Total framework) reached {_fmt_score(score)}, " + f"{direction_word} by {abs(yoy_val):.2f} points compared to the previous year " + f"({_fmt_score(prev_score)} in {prev_year}){pct_clause}." + ) + else: + sent2 = ( + f"The ASEAN overall score (Total framework) reached {_fmt_score(score)} in {year}; " + f"no prior-year data is available for year-over-year comparison." + ) + + # -- Sentence 3: country ranking ---------------------------------------- + sent3 = "" + if ranking_list: + top3 = ranking_list[:3] + last = ranking_list[-1] + if len(top3) >= 3: + after_first = ( + f"{top3[1]['country_name']} ({_fmt_score(top3[1]['score'])}) and " + f"{top3[2]['country_name']} ({_fmt_score(top3[2]['score'])})." + ) + elif len(top3) == 2: + after_first = f"{top3[1]['country_name']} ({_fmt_score(top3[1]['score'])})." + else: + after_first = "." + + sent3 = ( + f"In terms of country performance, {top3[0]['country_name']} led the region " + f"with a score of {_fmt_score(top3[0]['score'])}, followed by {after_first} " + f"At the other end, {last['country_name']} recorded the lowest score " + f"of {_fmt_score(last['score'])} in {year}." + ) + + # -- Sentence 4: most improved / declined ------------------------------ + sent4_parts = [] + if most_improved_country and most_improved_delta is not None: + sent4_parts.append( + f"the most notable improvement was seen in {most_improved_country}, " + f"which gained {_fmt_delta(most_improved_delta)} points from the previous year" + ) + if most_declined_country and most_declined_delta is not None: + if most_declined_delta < 0: + sent4_parts.append( + f"while {most_declined_country} experienced the largest decline " + f"of {_fmt_delta(most_declined_delta)} points" + ) + else: + sent4_parts.append( + f"while {most_declined_country} recorded the smallest gain " + f"of {_fmt_delta(most_declined_delta)} points" + ) + + sent4 = "" + if sent4_parts: + sent4 = ", ".join(sent4_parts) + "." + sent4 = sent4[0].upper() + sent4[1:] + + # -- Assemble ---------------------------------------------------------- + return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) + + +def _build_pillar_narrative( + year: int, + pillar_name: str, + pillar_score: float, + rank_in_year: int, + n_pillars: int, + yoy_val, + top_country, + top_country_score, + bot_country, + bot_country_score, + strongest_pillar, + strongest_score, + weakest_pillar, + weakest_score, + most_improved_pillar, + most_improved_delta, + most_declined_pillar, + most_declined_delta, +) -> str: + """ + Compose a full English prose narrative for a single pillar in a given year. + + Narrative structure + ------------------- + 1. Pillar score and rank + 2. Strongest / weakest pillar context + 3. Top / bottom country within this pillar + 4. YoY movement for this pillar + biggest mover across all pillars + """ + + # -- Sentence 1: pillar overview ---------------------------------------- + rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") + sent1 = ( + f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, " + f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN." + ) + + # -- Sentence 2: strongest / weakest context ---------------------------- + sent2 = "" + if strongest_pillar and weakest_pillar: + if strongest_pillar == pillar_name: + sent2 = ( + f"This made {pillar_name} the strongest performing pillar in {year}, " + f"compared to the weakest pillar, {weakest_pillar}, " + f"which scored {_fmt_score(weakest_score)}." + ) + elif weakest_pillar == pillar_name: + sent2 = ( + f"This made {pillar_name} the weakest performing pillar in {year}, " + f"compared to the strongest pillar, {strongest_pillar}, " + f"which scored {_fmt_score(strongest_score)}." + ) + else: + sent2 = ( + f"Across all pillars in {year}, {strongest_pillar} was the strongest " + f"(score: {_fmt_score(strongest_score)}), while {weakest_pillar} " + f"was the weakest (score: {_fmt_score(weakest_score)})." + ) + + # -- Sentence 3: country top / bottom within this pillar --------------- + sent3 = "" + if top_country and bot_country: + if top_country != bot_country: + sent3 = ( + f"Within the {pillar_name} pillar, {top_country} led with a score of " + f"{_fmt_score(top_country_score)}, while {bot_country} recorded the lowest " + f"score of {_fmt_score(bot_country_score)}." + ) + else: + sent3 = ( + f"Within the {pillar_name} pillar, {top_country} was the only country " + f"with available data, scoring {_fmt_score(top_country_score)}." + ) + + # -- Sentence 4: YoY movement ------------------------------------------- + if yoy_val is not None: + direction_word = "improved" if yoy_val >= 0 else "declined" + sent4 = ( + f"Compared to the previous year, the {pillar_name} pillar " + f"{direction_word} by {abs(yoy_val):.2f} points" + ) + else: + sent4 = ( + f"No prior-year data is available to calculate year-over-year change " + f"for the {pillar_name} pillar in {year}" + ) + + if most_improved_pillar and most_improved_delta is not None \ + and most_declined_pillar and most_declined_delta is not None \ + and most_improved_pillar != most_declined_pillar: + sent4 += ( + f". Across all pillars, {most_improved_pillar} showed the greatest improvement " + f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} " + f"recorded the largest decline ({_fmt_delta(most_declined_delta)} pts)" + ) + + sent4 += "." + sent4 = sent4[0].upper() + sent4[1:] + + # -- Assemble ---------------------------------------------------------- + return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) + + # ============================================================================= # MAIN CLASS # ============================================================================= @@ -145,6 +386,8 @@ class FoodSecurityAggregator: "agg_pillar_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, "agg_framework_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, "agg_framework_asean": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_narrative_overview": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_narrative_pillar": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, } self.df = None @@ -663,6 +906,297 @@ class FoodSecurityAggregator: self._finalize(table_name, rows) return df + # ========================================================================= + # STEP 6: agg_narrative_overview -> Gold (NEW) + # + # Sumber data : df_framework_asean (framework='Total') + df_framework_by_country + # Granularity : 1 row per year + # Columns : year, n_mdg_indicators, n_sdg_indicators, n_total_indicators, + # asean_total_score, yoy_change, yoy_change_pct, + # country_ranking_json, most_improved_country, most_improved_delta, + # most_declined_country, most_declined_delta, narrative_overview + # ========================================================================= + + def calc_narrative_overview( + self, + df_framework_asean: pd.DataFrame, + df_framework_by_country: pd.DataFrame, + ) -> pd.DataFrame: + table_name = "agg_narrative_overview" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold") + self.logger.info("=" * 70) + + # ASEAN-level Total framework rows, sorted by year + asean_total = ( + df_framework_asean[df_framework_asean["framework"] == "Total"] + .sort_values("year") + .reset_index(drop=True) + ) + + # Country-level Total framework rows (ranking + YoY per country) + country_total = ( + df_framework_by_country[df_framework_by_country["framework"] == "Total"] + .copy() + ) + + # Indicator counts per year per framework (self.df already has 'framework' column) + ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) + + records = [] + + for _, row in asean_total.iterrows(): + yr = int(row["year"]) + score = float(row["framework_score_1_100"]) + yoy = row["year_over_year_change"] + yoy_val = float(yoy) if pd.notna(yoy) else None + + # -- Indicator counts per framework for this year --------------- + yr_ind = ind_year[ind_year["year"] == yr] + n_mdg = int(yr_ind[yr_ind["framework"] == "MDGs"]["indicator_id"].nunique()) + n_sdg = int(yr_ind[yr_ind["framework"] == "SDGs"]["indicator_id"].nunique()) + n_total_ind = int(yr_ind["indicator_id"].nunique()) + + # -- YoY % ----------------------------------------------------- + prev_score = score - yoy_val if yoy_val is not None else None + yoy_pct = ( + (yoy_val / prev_score * 100) + if (yoy_val is not None and prev_score and prev_score != 0) + else None + ) + + # -- Country ranking for this year ----------------------------- + yr_country = ( + country_total[country_total["year"] == yr] + .sort_values("rank_in_framework_year") + .reset_index(drop=True) + ) + + ranking_list = [] + for _, cr in yr_country.iterrows(): + cr_yoy = cr.get("year_over_year_change", None) + ranking_list.append({ + "rank": int(cr["rank_in_framework_year"]), + "country_name": str(cr["country_name"]), + "score": round(float(cr["framework_score_1_100"]), 2), + "yoy_change": round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, + }) + country_ranking_json = json.dumps(ranking_list, ensure_ascii=False) + + # -- Most improved / declined country -------------------------- + yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) + if not yr_country_yoy.empty: + best_idx = yr_country_yoy["year_over_year_change"].idxmax() + worst_idx = yr_country_yoy["year_over_year_change"].idxmin() + most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) + most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) + most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) + most_declined_delta = round(float(yr_country_yoy.loc[worst_idx, "year_over_year_change"]), 2) + else: + most_improved_country = most_declined_country = None + most_improved_delta = most_declined_delta = None + + # -- Build narrative ------------------------------------------- + narrative = _build_overview_narrative( + year = yr, + n_mdg = n_mdg, + n_sdg = n_sdg, + n_total_ind = n_total_ind, + score = score, + yoy_val = yoy_val, + yoy_pct = yoy_pct, + prev_year = yr - 1, + prev_score = prev_score, + ranking_list = ranking_list, + most_improved_country = most_improved_country, + most_improved_delta = most_improved_delta, + most_declined_country = most_declined_country, + most_declined_delta = most_declined_delta, + ) + + records.append({ + "year": yr, + "n_mdg_indicators": n_mdg, + "n_sdg_indicators": n_sdg, + "n_total_indicators": n_total_ind, + "asean_total_score": round(score, 2), + "yoy_change": yoy_val, + "yoy_change_pct": round(yoy_pct, 2) if yoy_pct is not None else None, + "country_ranking_json": country_ranking_json, + "most_improved_country": most_improved_country, + "most_improved_delta": most_improved_delta, + "most_declined_country": most_declined_country, + "most_declined_delta": most_declined_delta, + "narrative_overview": narrative, + }) + + df = pd.DataFrame(records) + df["year"] = df["year"].astype(int) + df["n_mdg_indicators"] = df["n_mdg_indicators"].astype(int) + df["n_sdg_indicators"] = df["n_sdg_indicators"].astype(int) + df["n_total_indicators"] = df["n_total_indicators"].astype(int) + df["asean_total_score"] = df["asean_total_score"].astype(float) + for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]: + df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_sdg_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_total_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("asean_total_score", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_change_pct", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("country_ranking_json", "STRING", mode="REQUIRED"), + bigquery.SchemaField("most_improved_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema, + ) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 7: agg_narrative_pillar -> Gold (NEW) + # + # Sumber data : df_pillar_composite + df_pillar_by_country + # Granularity : 1 row per (year, pillar_id) + # Columns : year, pillar_id, pillar_name, pillar_score, rank_in_year, + # yoy_change, top_country, top_country_score, + # bottom_country, bottom_country_score, narrative_pillar + # ========================================================================= + + def calc_narrative_pillar( + self, + df_pillar_composite: pd.DataFrame, + df_pillar_by_country: pd.DataFrame, + ) -> pd.DataFrame: + table_name = "agg_narrative_pillar" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold") + self.logger.info("=" * 70) + + records = [] + years = sorted(df_pillar_composite["year"].unique()) + + for yr in years: + yr_pillars = ( + df_pillar_composite[df_pillar_composite["year"] == yr] + .sort_values("rank_in_year") + .reset_index(drop=True) + ) + yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] + + # Strongest / weakest pillar this year (for context sentence) + strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None + weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None + + # Biggest improvement / decline across all pillars this year + yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) + if not yr_pillars_yoy.empty: + best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() + worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() + most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) + most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) + most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) + most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2) + else: + most_improved_pillar = most_declined_pillar = None + most_improved_delta = most_declined_delta = None + + for _, prow in yr_pillars.iterrows(): + p_id = int(prow["pillar_id"]) + p_name = str(prow["pillar_name"]) + p_score = float(prow["pillar_score_1_100"]) + p_rank = int(prow["rank_in_year"]) + p_yoy = prow["year_over_year_change"] + p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None + + # Top / bottom country within this pillar & year + p_country = ( + yr_country_pillar[yr_country_pillar["pillar_id"] == p_id] + .sort_values("rank_in_pillar_year") + .reset_index(drop=True) + ) + if not p_country.empty: + top_country = str(p_country.iloc[0]["country_name"]) + top_country_score = round(float(p_country.iloc[0]["pillar_country_score_1_100"]), 2) + bot_country = str(p_country.iloc[-1]["country_name"]) + bot_country_score = round(float(p_country.iloc[-1]["pillar_country_score_1_100"]), 2) + else: + top_country = bot_country = None + top_country_score = bot_country_score = None + + # -- Build narrative --------------------------------------- + narrative = _build_pillar_narrative( + year = yr, + pillar_name = p_name, + pillar_score = p_score, + rank_in_year = p_rank, + n_pillars = len(yr_pillars), + yoy_val = p_yoy_val, + top_country = top_country, + top_country_score = top_country_score, + bot_country = bot_country, + bot_country_score = bot_country_score, + strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, + strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, + weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, + weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, + most_improved_pillar = most_improved_pillar, + most_improved_delta = most_improved_delta, + most_declined_pillar = most_declined_pillar, + most_declined_delta = most_declined_delta, + ) + + records.append({ + "year": yr, + "pillar_id": p_id, + "pillar_name": p_name, + "pillar_score": round(p_score, 2), + "rank_in_year": p_rank, + "yoy_change": p_yoy_val, + "top_country": top_country, + "top_country_score": top_country_score, + "bottom_country": bot_country, + "bottom_country_score": bot_country_score, + "narrative_pillar": narrative, + }) + + df = pd.DataFrame(records) + df["year"] = df["year"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["rank_in_year"] = df["rank_in_year"].astype(int) + for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: + df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_score", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("top_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema, + ) + self._finalize(table_name, rows) + return df + # ========================================================================= # HELPERS # ========================================================================= @@ -697,21 +1231,36 @@ class FoodSecurityAggregator: log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) # ========================================================================= - # RUN + # RUN — 6 tabel (4 lama + 2 narrative baru) # ========================================================================= def run(self): start = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info("FOOD SECURITY AGGREGATION v8.0 — 4 TABLES -> fs_asean_gold") + self.logger.info("FOOD SECURITY AGGREGATION v9.0 — 6 TABLES -> fs_asean_gold") + self.logger.info(" agg_pillar_composite | agg_pillar_by_country") + self.logger.info(" agg_framework_by_country| agg_framework_asean") + self.logger.info(" agg_narrative_overview | agg_narrative_pillar") self.logger.info("=" * 70) self.load_data() self._classify_indicators() - self.calc_pillar_composite() - self.calc_pillar_by_country() - self.calc_framework_by_country() - self.calc_framework_asean() + + # -- 4 tabel lama (tidak ada perubahan) ---------------------------- + df_pillar_composite = self.calc_pillar_composite() + df_pillar_by_country = self.calc_pillar_by_country() + df_framework_by_country = self.calc_framework_by_country() + df_framework_asean = self.calc_framework_asean() + + # -- 2 tabel narrative baru ---------------------------------------- + self.calc_narrative_overview( + df_framework_asean = df_framework_asean, + df_framework_by_country = df_framework_by_country, + ) + self.calc_narrative_pillar( + df_pillar_composite = df_pillar_composite, + df_pillar_by_country = df_pillar_by_country, + ) duration = (datetime.now() - start).total_seconds() total_rows = sum(m["rows_loaded"] for m in self.load_metadata.values()) @@ -734,6 +1283,7 @@ def run_aggregation(): """ Airflow task: Hitung semua agregasi dari analytical_food_security. Dipanggil setelah analytical_layer_to_gold selesai. + Menjalankan 6 tabel sekaligus: 4 agregasi + 2 narrative. """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() @@ -756,7 +1306,7 @@ if __name__ == "__main__": _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") print("=" * 70) - print("FOOD SECURITY AGGREGATION 4 TABLES -> fs_asean_gold") + print("FOOD SECURITY AGGREGATION v9.0 — 6 TABLES -> fs_asean_gold") print(f" NORMALIZE_FRAMEWORKS_JOINTLY = {NORMALIZE_FRAMEWORKS_JOINTLY}") print("=" * 70)