From 76b451b2c14744e4692719f4e7b85cfd973be2b8 Mon Sep 17 00:00:00 2001 From: Debby Date: Wed, 15 Apr 2026 13:37:40 +0700 Subject: [PATCH] schdule /3 bulan dan tambah metadata --- dags/etl_food_security.py | 23 +- scripts/bigquery_aggregate_layer.py | 1208 ++++++++++++++------------- 2 files changed, 659 insertions(+), 572 deletions(-) diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index 1c69968..2f99c23 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -2,7 +2,9 @@ AIRFLOW DAG — ETL Food Security BigQuery Kimball Data Warehouse Architecture -Schedule : Setiap 3 hari sekali (timedelta(days=3)) +Schedule : Setiap 3 bulan sekali (tanggal 1, pukul 00:00) + Cron: "0 0 1 */3 *" + -> 1 Jan, 1 Apr, 1 Jul, 1 Okt Catchup : False Kimball ETL Flow: @@ -72,7 +74,8 @@ from scripts.bigquery_dimensional_model import ( from scripts.bigquery_analytical_layer import ( run_analytical_layer, ) -from scripts.bigquery_aggregate_layer import ( +# FIXED: nama modul disesuaikan dengan nama file yang benar +from scripts.bigquery_analysis_aggregation import ( run_aggregation, ) from scripts.bigquery_aggraget_fact_selected_layer import ( @@ -87,15 +90,23 @@ default_args = { } # DAG DEFINITION +# +# schedule_interval = "0 0 1 */3 *" +# ┌───── menit : 0 +# │ ┌─── jam : 0 (tengah malam) +# │ │ ┌─ hari : 1 (tanggal 1 setiap bulan yang cocok) +# │ │ │ ┌ bulan : */3 (setiap 3 bulan -> Jan, Apr, Jul, Okt) +# │ │ │ │ ┌ hari minggu : * (semua) +# 0 0 1 */3 * with DAG( dag_id = "etl_food_security_bigquery", - description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)", + description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold) | Schedule: setiap 3 bulan", default_args = default_args, - start_date = datetime(2026, 3, 1), - schedule_interval = "0 0 */3 * *", + start_date = datetime(2026, 1, 1), + schedule_interval = "0 0 1 */3 *", # Setiap 3 bulan sekali catchup = False, - tags = ["food-security", "bigquery", "kimball"], + tags = ["food-security", "bigquery", "kimball", "quarterly"], ) as dag: task_verify = PythonOperator( diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index edfaced..c71b995 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -644,52 +644,57 @@ class FoodSecurityAggregator: self.logger.info(f"STEP 2: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - df_normed = self._get_norm_value_df() + try: + df_normed = self._get_norm_value_df() - df = ( - df_normed - .groupby(["pillar_id", "pillar_name", "year"]) - .agg( - pillar_norm =("norm_value", "mean"), - n_indicators=("indicator_id", "nunique"), - n_countries =("country_id", "nunique"), + df = ( + df_normed + .groupby(["pillar_id", "pillar_name", "year"]) + .agg( + pillar_norm =("norm_value", "mean"), + n_indicators=("indicator_id", "nunique"), + n_countries =("country_id", "nunique"), + ) + .reset_index() ) - .reset_index() - ) - df["pillar_score_1_100"] = global_minmax(df["pillar_norm"]) - df["rank_in_year"] = ( - df.groupby("year")["pillar_score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") + df["pillar_score_1_100"] = global_minmax(df["pillar_norm"]) + df["rank_in_year"] = ( + df.groupby("year")["pillar_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") - df["pillar_id"] = df["pillar_id"].astype(int) - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["n_countries"] = safe_int(df["n_countries"], col_name="n_countries", logger=self.logger) - df["rank_in_year"] = df["rank_in_year"].astype(int) - df["pillar_norm"] = df["pillar_norm"].astype(float) - df["pillar_score_1_100"] = df["pillar_score_1_100"].astype(float) + df["pillar_id"] = df["pillar_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["n_countries"] = safe_int(df["n_countries"], col_name="n_countries", logger=self.logger) + df["rank_in_year"] = df["rank_in_year"].astype(int) + df["pillar_norm"] = df["pillar_norm"].astype(float) + df["pillar_score_1_100"] = df["pillar_score_1_100"].astype(float) - schema = [ - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df + schema = [ + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + except Exception as e: + self._fail(table_name, e) + raise # ========================================================================= # STEP 3: agg_pillar_by_country @@ -702,47 +707,52 @@ class FoodSecurityAggregator: self.logger.info(f"STEP 3: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - df_normed = self._get_norm_value_df() + try: + df_normed = self._get_norm_value_df() - df = ( - df_normed - .groupby(["country_id", "country_name", "pillar_id", "pillar_name", "year"]) - .agg(pillar_country_norm=("norm_value", "mean")) - .reset_index() - ) + df = ( + df_normed + .groupby(["country_id", "country_name", "pillar_id", "pillar_name", "year"]) + .agg(pillar_country_norm=("norm_value", "mean")) + .reset_index() + ) - df["pillar_country_score_1_100"] = global_minmax(df["pillar_country_norm"]) - df["rank_in_pillar_year"] = ( - df.groupby(["pillar_id", "year"])["pillar_country_score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") + df["pillar_country_score_1_100"] = global_minmax(df["pillar_country_norm"]) + df["rank_in_pillar_year"] = ( + df.groupby(["pillar_id", "year"])["pillar_country_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") - df["country_id"] = df["country_id"].astype(int) - df["pillar_id"] = df["pillar_id"].astype(int) - df["year"] = df["year"].astype(int) - df["rank_in_pillar_year"] = df["rank_in_pillar_year"].astype(int) - df["pillar_country_norm"] = df["pillar_country_norm"].astype(float) - df["pillar_country_score_1_100"] = df["pillar_country_score_1_100"].astype(float) + df["country_id"] = df["country_id"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["year"] = df["year"].astype(int) + df["rank_in_pillar_year"] = df["rank_in_pillar_year"].astype(int) + df["pillar_country_norm"] = df["pillar_country_norm"].astype(float) + df["pillar_country_score_1_100"] = df["pillar_country_score_1_100"].astype(float) - schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_country_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_country_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + except Exception as e: + self._fail(table_name, e) + raise # ========================================================================= # STEP 4: agg_framework_by_country @@ -781,30 +791,14 @@ class FoodSecurityAggregator: self.logger.info(f"STEP 4: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - country_composite = self._calc_country_composite_inmemory() - df_normed = self._get_norm_value_df() - parts = [] + try: + country_composite = self._calc_country_composite_inmemory() + df_normed = self._get_norm_value_df() + parts = [] - # TOTAL - agg_total = ( - country_composite[[ - "country_id", "country_name", "year", - "score_1_100", "n_indicators", "composite_score" - ]] - .copy() - .rename(columns={ - "score_1_100" : "framework_score_1_100", - "composite_score": "framework_norm", - }) - ) - agg_total["framework"] = "Total" - parts.append(agg_total) - - # MDGs pre-SDGs - pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() - if not pre_sdgs_rows.empty: - mdgs_pre = ( - pre_sdgs_rows[[ + # TOTAL + agg_total = ( + country_composite[[ "country_id", "country_name", "year", "score_1_100", "n_indicators", "composite_score" ]] @@ -814,98 +808,119 @@ class FoodSecurityAggregator: "composite_score": "framework_norm", }) ) - mdgs_pre["framework"] = "MDGs" - parts.append(mdgs_pre) + agg_total["framework"] = "Total" + parts.append(agg_total) - # MDGs mixed (year >= sdgs_start_year, hanya indikator MDGs) - mdgs_indicator_ids = set( - self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"] - ) - if mdgs_indicator_ids: - df_mdgs_mixed = df_normed[ - (df_normed["indicator_id"].isin(mdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_mdgs_mixed.empty: - agg_mdgs_mixed = ( - df_mdgs_mixed - .groupby(["country_id", "country_name", "year"]) - .agg( - framework_norm=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique"), - ) - .reset_index() + # MDGs pre-SDGs + pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() + if not pre_sdgs_rows.empty: + mdgs_pre = ( + pre_sdgs_rows[[ + "country_id", "country_name", "year", + "score_1_100", "n_indicators", "composite_score" + ]] + .copy() + .rename(columns={ + "score_1_100" : "framework_score_1_100", + "composite_score": "framework_norm", + }) ) - if not NORMALIZE_FRAMEWORKS_JOINTLY: - agg_mdgs_mixed["framework_score_1_100"] = global_minmax(agg_mdgs_mixed["framework_norm"]) - agg_mdgs_mixed["framework"] = "MDGs" - parts.append(agg_mdgs_mixed) + mdgs_pre["framework"] = "MDGs" + parts.append(mdgs_pre) - # SDGs - sdgs_indicator_ids = set( - self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"] - ) - if sdgs_indicator_ids: - df_sdgs = df_normed[ - (df_normed["indicator_id"].isin(sdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_sdgs.empty: - agg_sdgs = ( - df_sdgs - .groupby(["country_id", "country_name", "year"]) - .agg( - framework_norm=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique"), + # MDGs mixed (year >= sdgs_start_year, hanya indikator MDGs) + mdgs_indicator_ids = set( + self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"] + ) + if mdgs_indicator_ids: + df_mdgs_mixed = df_normed[ + (df_normed["indicator_id"].isin(mdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_mdgs_mixed.empty: + agg_mdgs_mixed = ( + df_mdgs_mixed + .groupby(["country_id", "country_name", "year"]) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique"), + ) + .reset_index() ) - .reset_index() - ) - if not NORMALIZE_FRAMEWORKS_JOINTLY: - agg_sdgs["framework_score_1_100"] = global_minmax(agg_sdgs["framework_norm"]) - agg_sdgs["framework"] = "SDGs" - parts.append(agg_sdgs) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + agg_mdgs_mixed["framework_score_1_100"] = global_minmax(agg_mdgs_mixed["framework_norm"]) + agg_mdgs_mixed["framework"] = "MDGs" + parts.append(agg_mdgs_mixed) - df = pd.concat(parts, ignore_index=True) + # SDGs + sdgs_indicator_ids = set( + self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"] + ) + if sdgs_indicator_ids: + df_sdgs = df_normed[ + (df_normed["indicator_id"].isin(sdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_sdgs.empty: + agg_sdgs = ( + df_sdgs + .groupby(["country_id", "country_name", "year"]) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique"), + ) + .reset_index() + ) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + agg_sdgs["framework_score_1_100"] = global_minmax(agg_sdgs["framework_norm"]) + agg_sdgs["framework"] = "SDGs" + parts.append(agg_sdgs) - if NORMALIZE_FRAMEWORKS_JOINTLY: - mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) - if mixed_mask.any(): - df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + df = pd.concat(parts, ignore_index=True) - df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) - df["rank_in_framework_year"] = ( - df.groupby(["framework", "year"])["framework_score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["country_id", "framework"], "framework_score_1_100") + if NORMALIZE_FRAMEWORKS_JOINTLY: + mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + if mixed_mask.any(): + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) - df["country_id"] = df["country_id"].astype(int) - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["rank_in_framework_year"] = safe_int(df["rank_in_framework_year"], col_name="rank_in_framework_year", logger=self.logger) - df["framework_norm"] = df["framework_norm"].astype(float) - df["framework_score_1_100"] = df["framework_score_1_100"].astype(float) + df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) + df["rank_in_framework_year"] = ( + df.groupby(["framework", "year"])["framework_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id", "framework"], "framework_score_1_100") - self._validate_mdgs_equals_total(df, level="country") + df["country_id"] = df["country_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["rank_in_framework_year"] = safe_int(df["rank_in_framework_year"], col_name="rank_in_framework_year", logger=self.logger) + df["framework_norm"] = df["framework_norm"].astype(float) + df["framework_score_1_100"] = df["framework_score_1_100"].astype(float) - schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df + self._validate_mdgs_equals_total(df, level="country") + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + except Exception as e: + self._fail(table_name, e) + raise # ========================================================================= # STEP 5: agg_framework_asean (+ performance_status) @@ -919,181 +934,184 @@ class FoodSecurityAggregator: self.logger.info(f" performance_status threshold: {PERFORMANCE_THRESHOLD}") self.logger.info("=" * 70) - df_normed = self._get_norm_value_df() - country_composite = self._calc_country_composite_inmemory() + try: + df_normed = self._get_norm_value_df() + country_composite = self._calc_country_composite_inmemory() - country_norm = ( - df_normed - .groupby(["country_id", "country_name", "year"])["norm_value"] - .mean().reset_index() - .rename(columns={"norm_value": "country_norm"}) - ) - asean_overall = ( - country_norm.groupby("year") - .agg( - asean_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries=("country_norm", "count"), + country_norm = ( + df_normed + .groupby(["country_id", "country_name", "year"])["norm_value"] + .mean().reset_index() + .rename(columns={"norm_value": "country_norm"}) ) - .reset_index() - ) - asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) - asean_comp = ( - country_composite.groupby("year")["composite_score"] - .mean().reset_index() - .rename(columns={"composite_score": "asean_composite"}) - ) - asean_overall = asean_overall.merge(asean_comp, on="year", how="left") + asean_overall = ( + country_norm.groupby("year") + .agg( + asean_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries=("country_norm", "count"), + ) + .reset_index() + ) + asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) + asean_comp = ( + country_composite.groupby("year")["composite_score"] + .mean().reset_index() + .rename(columns={"composite_score": "asean_composite"}) + ) + asean_overall = asean_overall.merge(asean_comp, on="year", how="left") - parts = [] + parts = [] - # ------------------------------------------------------------------ - # Helper: hitung n_indicators per framework per year dari lookup - # ------------------------------------------------------------------ - def _n_ind(year_val, framework_val): - return self._count_framework_indicators(year_val, framework_val) + # ------------------------------------------------------------------ + # Helper: hitung n_indicators per framework per year dari lookup + # ------------------------------------------------------------------ + def _n_ind(year_val, framework_val): + return self._count_framework_indicators(year_val, framework_val) - # TOTAL - total_cols = asean_overall[[ - "year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries" - ]].copy().rename(columns={ - "asean_score_1_100": "framework_score_1_100", - "asean_norm" : "framework_norm", - "n_countries" : "n_countries_with_data", - }) - # n_indicators Total = semua indikator yang hadir di tahun tsb - total_cols["n_indicators"] = total_cols["year"].apply( - lambda y: int(self._ind_year_framework[ - self._ind_year_framework["year"] == y - ]["indicator_id"].nunique()) - ) - total_cols["framework"] = "Total" - parts.append(total_cols) - - # MDGs pre-SDGs - pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() - if not pre_sdgs.empty: - mdgs_pre = pre_sdgs[[ + # TOTAL + total_cols = asean_overall[[ "year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries" ]].copy().rename(columns={ "asean_score_1_100": "framework_score_1_100", "asean_norm" : "framework_norm", "n_countries" : "n_countries_with_data", }) - # Pre-SDGs era: semua indikator berlabel MDGs - mdgs_pre["n_indicators"] = mdgs_pre["year"].apply( - lambda y: _n_ind(y, "MDGs") + total_cols["n_indicators"] = total_cols["year"].apply( + lambda y: int(self._ind_year_framework[ + self._ind_year_framework["year"] == y + ]["indicator_id"].nunique()) ) - mdgs_pre["framework"] = "MDGs" - parts.append(mdgs_pre) + total_cols["framework"] = "Total" + parts.append(total_cols) - # MDGs mixed - mdgs_indicator_ids = set( - self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"] - ) - if mdgs_indicator_ids: - df_mdgs_mixed = df_normed[ - (df_normed["indicator_id"].isin(mdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_mdgs_mixed.empty: - cn = ( - df_mdgs_mixed - .groupby(["country_id", "year"])["norm_value"].mean() - .reset_index().rename(columns={"norm_value": "country_norm"}) - ) - asean_mdgs = cn.groupby("year").agg( - framework_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries_with_data=("country_id", "count"), - ).reset_index() - asean_mdgs["n_indicators"] = asean_mdgs["year"].apply( + # MDGs pre-SDGs + pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() + if not pre_sdgs.empty: + mdgs_pre = pre_sdgs[[ + "year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries" + ]].copy().rename(columns={ + "asean_score_1_100": "framework_score_1_100", + "asean_norm" : "framework_norm", + "n_countries" : "n_countries_with_data", + }) + mdgs_pre["n_indicators"] = mdgs_pre["year"].apply( lambda y: _n_ind(y, "MDGs") ) - if not NORMALIZE_FRAMEWORKS_JOINTLY: - asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) - asean_mdgs["framework"] = "MDGs" - parts.append(asean_mdgs) + mdgs_pre["framework"] = "MDGs" + parts.append(mdgs_pre) - # SDGs - sdgs_indicator_ids = set( - self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"] - ) - if sdgs_indicator_ids: - df_sdgs = df_normed[ - (df_normed["indicator_id"].isin(sdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_sdgs.empty: - cn = ( - df_sdgs - .groupby(["country_id", "year"])["norm_value"].mean() - .reset_index().rename(columns={"norm_value": "country_norm"}) - ) - asean_sdgs = cn.groupby("year").agg( - framework_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries_with_data=("country_id", "count"), - ).reset_index() - asean_sdgs["n_indicators"] = asean_sdgs["year"].apply( - lambda y: _n_ind(y, "SDGs") - ) - if not NORMALIZE_FRAMEWORKS_JOINTLY: - asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) - asean_sdgs["framework"] = "SDGs" - parts.append(asean_sdgs) + # MDGs mixed + mdgs_indicator_ids = set( + self._ind_year_framework[self._ind_year_framework["framework"] == "MDGs"]["indicator_id"] + ) + if mdgs_indicator_ids: + df_mdgs_mixed = df_normed[ + (df_normed["indicator_id"].isin(mdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_mdgs_mixed.empty: + cn = ( + df_mdgs_mixed + .groupby(["country_id", "year"])["norm_value"].mean() + .reset_index().rename(columns={"norm_value": "country_norm"}) + ) + asean_mdgs = cn.groupby("year").agg( + framework_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries_with_data=("country_id", "count"), + ).reset_index() + asean_mdgs["n_indicators"] = asean_mdgs["year"].apply( + lambda y: _n_ind(y, "MDGs") + ) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) + asean_mdgs["framework"] = "MDGs" + parts.append(asean_mdgs) - df = pd.concat(parts, ignore_index=True) + # SDGs + sdgs_indicator_ids = set( + self._ind_year_framework[self._ind_year_framework["framework"] == "SDGs"]["indicator_id"] + ) + if sdgs_indicator_ids: + df_sdgs = df_normed[ + (df_normed["indicator_id"].isin(sdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_sdgs.empty: + cn = ( + df_sdgs + .groupby(["country_id", "year"])["norm_value"].mean() + .reset_index().rename(columns={"norm_value": "country_norm"}) + ) + asean_sdgs = cn.groupby("year").agg( + framework_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries_with_data=("country_id", "count"), + ).reset_index() + asean_sdgs["n_indicators"] = asean_sdgs["year"].apply( + lambda y: _n_ind(y, "SDGs") + ) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) + asean_sdgs["framework"] = "SDGs" + parts.append(asean_sdgs) - if NORMALIZE_FRAMEWORKS_JOINTLY: - mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) - if mixed_mask.any(): - df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + df = pd.concat(parts, ignore_index=True) - df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) - df = add_yoy(df, ["framework"], "framework_score_1_100") + if NORMALIZE_FRAMEWORKS_JOINTLY: + mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + if mixed_mask.any(): + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) - # performance_status - df["performance_status"] = df["framework_score_1_100"].apply(_performance_status) + df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) + df = add_yoy(df, ["framework"], "framework_score_1_100") - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) - for col in ["framework_norm", "std_norm", "framework_score_1_100"]: - df[col] = df[col].astype(float) - df["performance_status"] = df["performance_status"].astype(str) + # performance_status + df["performance_status"] = df["framework_score_1_100"].apply(_performance_status) - self._validate_mdgs_equals_total(df, level="asean") + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) + for col in ["framework_norm", "std_norm", "framework_score_1_100"]: + df[col] = df[col].astype(float) + df["performance_status"] = df["performance_status"].astype(str) - # Log performance summary - self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):") - for fw in df["framework"].unique(): - sub = df[df["framework"] == fw].sort_values("year") - for _, r in sub.iterrows(): - self.logger.info( - f" {fw:<8} {int(r['year'])}: " - f"score={r['framework_score_1_100']:.2f} " - f"n_ind={int(r['n_indicators'])} " - f"-> {r['performance_status']}" - ) + self._validate_mdgs_equals_total(df, level="asean") - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_countries_with_data", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("std_norm", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("performance_status", "STRING", mode="REQUIRED"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df + # Log performance summary + self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):") + for fw in df["framework"].unique(): + sub = df[df["framework"] == fw].sort_values("year") + for _, r in sub.iterrows(): + self.logger.info( + f" {fw:<8} {int(r['year'])}: " + f"score={r['framework_score_1_100']:.2f} " + f"n_ind={int(r['n_indicators'])} " + f"-> {r['performance_status']}" + ) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries_with_data", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("std_norm", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("performance_status", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + except Exception as e: + self._fail(table_name, e) + raise # ========================================================================= # STEP 6: agg_narrative_overview @@ -1110,141 +1128,146 @@ class FoodSecurityAggregator: self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - asean_total = ( - df_framework_asean[df_framework_asean["framework"] == "Total"] - .sort_values("year") - .reset_index(drop=True) - ) - - score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) - status_by_year = dict(zip(asean_total["year"].astype(int), asean_total["performance_status"].astype(str))) - - country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() - - records = [] - - for _, row in asean_total.iterrows(): - yr = int(row["year"]) - score = float(row["framework_score_1_100"]) - perf_status = str(row["performance_status"]) - yoy = row["year_over_year_change"] - yoy_val = float(yoy) if pd.notna(yoy) else None - - # n_indicators per framework per year (dari lookup) - n_mdg = self._count_framework_indicators(yr, "MDGs") - n_sdg = self._count_framework_indicators(yr, "SDGs") - n_total_ind = int( - self._ind_year_framework[ - self._ind_year_framework["year"] == yr - ]["indicator_id"].nunique() - ) - - prev_score = score_by_year.get(yr - 1, None) - prev_status = status_by_year.get(yr - 1, "N/A") - - yoy_pct = ( - (yoy_val / prev_score * 100) - if (yoy_val is not None and prev_score is not None and prev_score != 0) - else None - ) - - yr_country = ( - country_total[country_total["year"] == yr] - .sort_values("rank_in_framework_year") + try: + asean_total = ( + df_framework_asean[df_framework_asean["framework"] == "Total"] + .sort_values("year") .reset_index(drop=True) ) - ranking_list = [] - for _, cr in yr_country.iterrows(): - cr_yoy = cr.get("year_over_year_change", None) - ranking_list.append({ - "rank": int(cr["rank_in_framework_year"]), - "country_name": str(cr["country_name"]), - "score": round(float(cr["framework_score_1_100"]), 2), - "yoy_change": round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, + score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) + status_by_year = dict(zip(asean_total["year"].astype(int), asean_total["performance_status"].astype(str))) + + country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() + + records = [] + + for _, row in asean_total.iterrows(): + yr = int(row["year"]) + score = float(row["framework_score_1_100"]) + perf_status = str(row["performance_status"]) + yoy = row["year_over_year_change"] + yoy_val = float(yoy) if pd.notna(yoy) else None + + # n_indicators per framework per year (dari lookup) + n_mdg = self._count_framework_indicators(yr, "MDGs") + n_sdg = self._count_framework_indicators(yr, "SDGs") + n_total_ind = int( + self._ind_year_framework[ + self._ind_year_framework["year"] == yr + ]["indicator_id"].nunique() + ) + + prev_score = score_by_year.get(yr - 1, None) + prev_status = status_by_year.get(yr - 1, "N/A") + + yoy_pct = ( + (yoy_val / prev_score * 100) + if (yoy_val is not None and prev_score is not None and prev_score != 0) + else None + ) + + yr_country = ( + country_total[country_total["year"] == yr] + .sort_values("rank_in_framework_year") + .reset_index(drop=True) + ) + + ranking_list = [] + for _, cr in yr_country.iterrows(): + cr_yoy = cr.get("year_over_year_change", None) + ranking_list.append({ + "rank": int(cr["rank_in_framework_year"]), + "country_name": str(cr["country_name"]), + "score": round(float(cr["framework_score_1_100"]), 2), + "yoy_change": round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, + }) + country_ranking_json = json.dumps(ranking_list, ensure_ascii=False) + + yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) + if not yr_country_yoy.empty: + best_idx = yr_country_yoy["year_over_year_change"].idxmax() + worst_idx = yr_country_yoy["year_over_year_change"].idxmin() + most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) + most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) + most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) + most_declined_delta = round(float(yr_country_yoy.loc[worst_idx, "year_over_year_change"]), 2) + else: + most_improved_country = most_declined_country = None + most_improved_delta = most_declined_delta = None + + narrative = _build_overview_narrative( + year = yr, + n_mdg = n_mdg, + n_sdg = n_sdg, + n_total_ind = n_total_ind, + score = score, + performance_status = perf_status, + yoy_val = yoy_val, + yoy_pct = yoy_pct, + prev_year = yr - 1, + prev_score = prev_score, + prev_performance_status = prev_status, + ranking_list = ranking_list, + most_improved_country = most_improved_country, + most_improved_delta = most_improved_delta, + most_declined_country = most_declined_country, + most_declined_delta = most_declined_delta, + ) + + records.append({ + "year": yr, + "n_mdg_indicators": n_mdg, + "n_sdg_indicators": n_sdg, + "n_total_indicators": n_total_ind, + "asean_total_score": round(score, 2), + "performance_status": perf_status, + "yoy_change": yoy_val, + "yoy_change_pct": round(yoy_pct, 2) if yoy_pct is not None else None, + "country_ranking_json": country_ranking_json, + "most_improved_country": most_improved_country, + "most_improved_delta": most_improved_delta, + "most_declined_country": most_declined_country, + "most_declined_delta": most_declined_delta, + "narrative_overview": narrative, }) - country_ranking_json = json.dumps(ranking_list, ensure_ascii=False) - yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) - if not yr_country_yoy.empty: - best_idx = yr_country_yoy["year_over_year_change"].idxmax() - worst_idx = yr_country_yoy["year_over_year_change"].idxmin() - most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) - most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) - most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) - most_declined_delta = round(float(yr_country_yoy.loc[worst_idx, "year_over_year_change"]), 2) - else: - most_improved_country = most_declined_country = None - most_improved_delta = most_declined_delta = None + df = pd.DataFrame(records) + df["year"] = df["year"].astype(int) + df["n_mdg_indicators"] = df["n_mdg_indicators"].astype(int) + df["n_sdg_indicators"] = df["n_sdg_indicators"].astype(int) + df["n_total_indicators"] = df["n_total_indicators"].astype(int) + df["asean_total_score"] = df["asean_total_score"].astype(float) + df["performance_status"] = df["performance_status"].astype(str) + for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]: + df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) - narrative = _build_overview_narrative( - year = yr, - n_mdg = n_mdg, - n_sdg = n_sdg, - n_total_ind = n_total_ind, - score = score, - performance_status = perf_status, - yoy_val = yoy_val, - yoy_pct = yoy_pct, - prev_year = yr - 1, - prev_score = prev_score, - prev_performance_status = prev_status, - ranking_list = ranking_list, - most_improved_country = most_improved_country, - most_improved_delta = most_improved_delta, - most_declined_country = most_declined_country, - most_declined_delta = most_declined_delta, + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_sdg_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_total_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("asean_total_score", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("performance_status", "STRING", mode="REQUIRED"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_change_pct", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("country_ranking_json", "STRING", mode="REQUIRED"), + bigquery.SchemaField("most_improved_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema, ) + self._finalize(table_name, rows) + return df - records.append({ - "year": yr, - "n_mdg_indicators": n_mdg, - "n_sdg_indicators": n_sdg, - "n_total_indicators": n_total_ind, - "asean_total_score": round(score, 2), - "performance_status": perf_status, - "yoy_change": yoy_val, - "yoy_change_pct": round(yoy_pct, 2) if yoy_pct is not None else None, - "country_ranking_json": country_ranking_json, - "most_improved_country": most_improved_country, - "most_improved_delta": most_improved_delta, - "most_declined_country": most_declined_country, - "most_declined_delta": most_declined_delta, - "narrative_overview": narrative, - }) - - df = pd.DataFrame(records) - df["year"] = df["year"].astype(int) - df["n_mdg_indicators"] = df["n_mdg_indicators"].astype(int) - df["n_sdg_indicators"] = df["n_sdg_indicators"].astype(int) - df["n_total_indicators"] = df["n_total_indicators"].astype(int) - df["asean_total_score"] = df["asean_total_score"].astype(float) - df["performance_status"] = df["performance_status"].astype(str) - for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]: - df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) - - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_sdg_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_total_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("asean_total_score", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("performance_status", "STRING", mode="REQUIRED"), - bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_change_pct", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("country_ranking_json", "STRING", mode="REQUIRED"), - bigquery.SchemaField("most_improved_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema, - ) - self._finalize(table_name, rows) - return df + except Exception as e: + self._fail(table_name, e) + raise # ========================================================================= # STEP 7: agg_narrative_pillar @@ -1261,115 +1284,120 @@ class FoodSecurityAggregator: self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - records = [] - years = sorted(df_pillar_composite["year"].unique()) + try: + records = [] + years = sorted(df_pillar_composite["year"].unique()) - for yr in years: - yr_pillars = ( - df_pillar_composite[df_pillar_composite["year"] == yr] - .sort_values("rank_in_year") - .reset_index(drop=True) - ) - yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] - - strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None - weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None - - yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) - if not yr_pillars_yoy.empty: - best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() - worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() - most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) - most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) - most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) - most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2) - else: - most_improved_pillar = most_declined_pillar = None - most_improved_delta = most_declined_delta = None - - for _, prow in yr_pillars.iterrows(): - p_id = int(prow["pillar_id"]) - p_name = str(prow["pillar_name"]) - p_score = float(prow["pillar_score_1_100"]) - p_rank = int(prow["rank_in_year"]) - p_yoy = prow["year_over_year_change"] - p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None - - p_country = ( - yr_country_pillar[yr_country_pillar["pillar_id"] == p_id] - .sort_values("rank_in_pillar_year") + for yr in years: + yr_pillars = ( + df_pillar_composite[df_pillar_composite["year"] == yr] + .sort_values("rank_in_year") .reset_index(drop=True) ) - if not p_country.empty: - top_country = str(p_country.iloc[0]["country_name"]) - top_country_score = round(float(p_country.iloc[0]["pillar_country_score_1_100"]), 2) - bot_country = str(p_country.iloc[-1]["country_name"]) - bot_country_score = round(float(p_country.iloc[-1]["pillar_country_score_1_100"]), 2) + yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] + + strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None + weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None + + yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) + if not yr_pillars_yoy.empty: + best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() + worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() + most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) + most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) + most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) + most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2) else: - top_country = bot_country = None - top_country_score = bot_country_score = None + most_improved_pillar = most_declined_pillar = None + most_improved_delta = most_declined_delta = None - narrative = _build_pillar_narrative( - year = yr, - pillar_name = p_name, - pillar_score = p_score, - rank_in_year = p_rank, - n_pillars = len(yr_pillars), - yoy_val = p_yoy_val, - top_country = top_country, - top_country_score = top_country_score, - bot_country = bot_country, - bot_country_score = bot_country_score, - strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, - strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, - weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, - weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, - most_improved_pillar = most_improved_pillar, - most_improved_delta = most_improved_delta, - most_declined_pillar = most_declined_pillar, - most_declined_delta = most_declined_delta, - ) + for _, prow in yr_pillars.iterrows(): + p_id = int(prow["pillar_id"]) + p_name = str(prow["pillar_name"]) + p_score = float(prow["pillar_score_1_100"]) + p_rank = int(prow["rank_in_year"]) + p_yoy = prow["year_over_year_change"] + p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None - records.append({ - "year": yr, - "pillar_id": p_id, - "pillar_name": p_name, - "pillar_score": round(p_score, 2), - "rank_in_year": p_rank, - "yoy_change": p_yoy_val, - "top_country": top_country, - "top_country_score": top_country_score, - "bottom_country": bot_country, - "bottom_country_score": bot_country_score, - "narrative_pillar": narrative, - }) + p_country = ( + yr_country_pillar[yr_country_pillar["pillar_id"] == p_id] + .sort_values("rank_in_pillar_year") + .reset_index(drop=True) + ) + if not p_country.empty: + top_country = str(p_country.iloc[0]["country_name"]) + top_country_score = round(float(p_country.iloc[0]["pillar_country_score_1_100"]), 2) + bot_country = str(p_country.iloc[-1]["country_name"]) + bot_country_score = round(float(p_country.iloc[-1]["pillar_country_score_1_100"]), 2) + else: + top_country = bot_country = None + top_country_score = bot_country_score = None - df = pd.DataFrame(records) - df["year"] = df["year"].astype(int) - df["pillar_id"] = df["pillar_id"].astype(int) - df["rank_in_year"] = df["rank_in_year"].astype(int) - for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: - df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + narrative = _build_pillar_narrative( + year = yr, + pillar_name = p_name, + pillar_score = p_score, + rank_in_year = p_rank, + n_pillars = len(yr_pillars), + yoy_val = p_yoy_val, + top_country = top_country, + top_country_score = top_country_score, + bot_country = bot_country, + bot_country_score = bot_country_score, + strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, + strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, + weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, + weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, + most_improved_pillar = most_improved_pillar, + most_improved_delta = most_improved_delta, + most_declined_pillar = most_declined_pillar, + most_declined_delta = most_declined_delta, + ) - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_score", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("top_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema, - ) - self._finalize(table_name, rows) - return df + records.append({ + "year": yr, + "pillar_id": p_id, + "pillar_name": p_name, + "pillar_score": round(p_score, 2), + "rank_in_year": p_rank, + "yoy_change": p_yoy_val, + "top_country": top_country, + "top_country_score": top_country_score, + "bottom_country": bot_country, + "bottom_country_score": bot_country_score, + "narrative_pillar": narrative, + }) + + df = pd.DataFrame(records) + df["year"] = df["year"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["rank_in_year"] = df["rank_in_year"].astype(int) + for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: + df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_score", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("top_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema, + ) + self._finalize(table_name, rows) + return df + + except Exception as e: + self._fail(table_name, e) + raise # ========================================================================= # HELPERS @@ -1392,16 +1420,64 @@ class FoodSecurityAggregator: self.logger.info(f" -> {status} (n_checked={len(check)})") def _finalize(self, table_name: str, rows_loaded: int): + """ + Tandai tabel sebagai sukses, catat ke etl_logs dan etl_metadata. + start_time diambil dari self.load_metadata yang di-set di awal tiap step. + """ + end_time = datetime.now() + start_time = self.load_metadata[table_name].get("start_time") + self.load_metadata[table_name].update({ - "rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now(), + "rows_loaded": rows_loaded, + "status": "success", + "end_time": end_time, }) + + # Catat ke etl_logs (ringkasan singkat) log_update(self.client, "DW", table_name, "full_load", rows_loaded) + + # Catat ke etl_metadata (detail: durasi, status, rows) + save_etl_metadata( + client = self.client, + table_name = table_name, + layer = "gold", + rows_loaded= rows_loaded, + start_time = start_time, + end_time = end_time, + status = "success", + ) + self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") def _fail(self, table_name: str, error: Exception): - self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) - self.logger.error(f" [FAIL] {table_name}: {error}") - log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) + """ + Tandai tabel sebagai gagal, catat ke etl_logs dan etl_metadata beserta pesan error. + """ + end_time = datetime.now() + start_time = self.load_metadata[table_name].get("start_time") + error_msg = str(error) + + self.load_metadata[table_name].update({ + "status": "failed", + "end_time": end_time, + }) + + # Catat ke etl_logs + log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg) + + # Catat ke etl_metadata dengan status failed + pesan error + save_etl_metadata( + client = self.client, + table_name = table_name, + layer = "gold", + rows_loaded= 0, + start_time = start_time, + end_time = end_time, + status = "failed", + error_msg = error_msg, + ) + + self.logger.error(f" [FAIL] {table_name}: {error_msg}") # ========================================================================= # RUN