From 40528766bd69de886cc8fc666758812b2332912a Mon Sep 17 00:00:00 2001 From: Debby Date: Thu, 16 Apr 2026 08:14:10 +0700 Subject: [PATCH] add metadata aggregate --- scripts/bigquery_aggregate_layer.py | 211 +++++++++++++++++----------- 1 file changed, 130 insertions(+), 81 deletions(-) diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index c71b995..a78a1de 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -252,7 +252,6 @@ def _build_overview_narrative( trend_word = "improvement" if yoy_val >= 0 else "decline" pct_clause = f", representing a {abs_pct:.2f}% {trend_word} year-over-year" - # Note if performance status changed status_change = "" if prev_performance_status not in ("N/A", None) and prev_performance_status != performance_status: status_change = ( @@ -444,7 +443,6 @@ class FoodSecurityAggregator: self.sdgs_start_year = None # Lookup: (indicator_id, year) -> framework label - # Dibangun di _assign_framework_labels(), dipakai di _count_framework_indicators() self._ind_year_framework: pd.DataFrame = None # ========================================================================= @@ -495,11 +493,9 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 1b: Detect sdgs_start_year + assign framework per (indicator, year) - # Konsisten dengan logika di bigquery_aggraget_fact_selected_layer.py # ========================================================================= def _detect_sdgs_start_year(self) -> int: - """Deteksi sdgs_start_year dari kehadiran FIES di data (metode eksplisit).""" fies_rows = self.df[ self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER) ] @@ -508,7 +504,6 @@ class FoodSecurityAggregator: self.logger.info(f" [FIES explicit] sdgs_start_year = {sdgs_start}") return sdgs_start - # Fallback: gap terbesar pada distribusi min_year ind_min_year = ( self.df.groupby("indicator_id")["year"] .min().reset_index().rename(columns={"year": "min_year"}) @@ -528,17 +523,6 @@ class FoodSecurityAggregator: return int(y_after) def _assign_framework_labels(self): - """ - Buat lookup table _ind_year_framework: DataFrame(indicator_id, year, framework). - - Aturan (identik dengan IndicatorNormAggregator._assign_framework): - - Indikator TIDAK di SDG_ONLY_KEYWORDS -> selalu "MDGs" - - Indikator DI SDG_ONLY_KEYWORDS: - year < sdgs_start_year -> "MDGs" - year >= sdgs_start_year -> "SDGs" - - Juga attach kolom 'framework' ke self.df untuk dipakai _get_norm_value_df(). - """ self.logger.info("\n" + "=" * 70) self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS (per indicator per year)") self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}") @@ -552,20 +536,17 @@ class FoodSecurityAggregator: df = df.drop(columns=["_is_sdg_kw"]) self.df = df - # Build compact lookup (unique indicator_id x year x framework) self._ind_year_framework = ( self.df[["indicator_id", "year", "framework"]] .drop_duplicates() .reset_index(drop=True) ) - # Log distribusi fw_dist = self.df["framework"].value_counts() self.logger.info("\n Framework distribution (rows):") for fw, cnt in fw_dist.items(): self.logger.info(f" {fw:<6}: {cnt:,} rows") - # n_indicators per framework per year ind_fw_yr = ( self._ind_year_framework .groupby(["year", "framework"])["indicator_id"] @@ -574,9 +555,7 @@ class FoodSecurityAggregator: .rename(columns={"indicator_id": "n_indicators"}) .sort_values(["year", "framework"]) ) - self.logger.info( - f"\n {'Year':<6} {'Framework':<8} {'n_indicators'}" - ) + self.logger.info(f"\n {'Year':<6} {'Framework':<8} {'n_indicators'}") self.logger.info(" " + "-" * 30) for _, r in ind_fw_yr.iterrows(): self.logger.info( @@ -584,10 +563,6 @@ class FoodSecurityAggregator: ) def _count_framework_indicators(self, year: int, framework: str) -> int: - """ - Hitung jumlah indikator unik untuk framework tertentu di tahun tertentu. - Menggunakan _ind_year_framework yang dibangun di _assign_framework_labels(). - """ mask = ( (self._ind_year_framework["year"] == year) & (self._ind_year_framework["framework"] == framework) @@ -633,6 +608,46 @@ class FoodSecurityAggregator: return pd.concat(norm_parts, ignore_index=True) + # ========================================================================= + # METADATA BUILDER + # Menyesuaikan dengan signature: save_etl_metadata(client, metadata: dict) + # dan skema etl_metadata: source_class, table_name, execution_timestamp, + # duration_seconds, rows_fetched, rows_transformed, rows_loaded, + # completeness_pct, config_snapshot, validation_metrics + # ========================================================================= + + def _build_etl_metadata( + self, + table_name: str, + rows_loaded: int, + start_time: datetime, + end_time: datetime, + status: str, + error_msg: str = None, + ) -> dict: + duration = (end_time - start_time).total_seconds() if (start_time and end_time) else 0.0 + return { + "source_class" : "FoodSecurityAggregator", + "table_name" : table_name, + "execution_timestamp": start_time or end_time, + "duration_seconds" : round(duration, 4), + "rows_fetched" : rows_loaded, + "rows_transformed" : rows_loaded, + "rows_loaded" : rows_loaded, + "completeness_pct" : 100.0 if status == "success" else 0.0, + "config_snapshot" : json.dumps({ + "layer" : "gold", + "write_disposition" : "WRITE_TRUNCATE", + "normalize_frameworks_jointly": NORMALIZE_FRAMEWORKS_JOINTLY, + "performance_threshold" : PERFORMANCE_THRESHOLD, + "status" : status, + }), + "validation_metrics" : json.dumps({ + "status" : status, + "error_msg": error_msg or "", + }), + } + # ========================================================================= # STEP 2: agg_pillar_composite # ========================================================================= @@ -963,9 +978,6 @@ class FoodSecurityAggregator: parts = [] - # ------------------------------------------------------------------ - # Helper: hitung n_indicators per framework per year dari lookup - # ------------------------------------------------------------------ def _n_ind(year_val, framework_val): return self._count_framework_indicators(year_val, framework_val) @@ -995,10 +1007,8 @@ class FoodSecurityAggregator: "asean_norm" : "framework_norm", "n_countries" : "n_countries_with_data", }) - mdgs_pre["n_indicators"] = mdgs_pre["year"].apply( - lambda y: _n_ind(y, "MDGs") - ) - mdgs_pre["framework"] = "MDGs" + mdgs_pre["n_indicators"] = mdgs_pre["year"].apply(lambda y: _n_ind(y, "MDGs")) + mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) # MDGs mixed @@ -1021,9 +1031,7 @@ class FoodSecurityAggregator: std_norm =("country_norm", "std"), n_countries_with_data=("country_id", "count"), ).reset_index() - asean_mdgs["n_indicators"] = asean_mdgs["year"].apply( - lambda y: _n_ind(y, "MDGs") - ) + asean_mdgs["n_indicators"] = asean_mdgs["year"].apply(lambda y: _n_ind(y, "MDGs")) if not NORMALIZE_FRAMEWORKS_JOINTLY: asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) asean_mdgs["framework"] = "MDGs" @@ -1049,9 +1057,7 @@ class FoodSecurityAggregator: std_norm =("country_norm", "std"), n_countries_with_data=("country_id", "count"), ).reset_index() - asean_sdgs["n_indicators"] = asean_sdgs["year"].apply( - lambda y: _n_ind(y, "SDGs") - ) + asean_sdgs["n_indicators"] = asean_sdgs["year"].apply(lambda y: _n_ind(y, "SDGs")) if not NORMALIZE_FRAMEWORKS_JOINTLY: asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) asean_sdgs["framework"] = "SDGs" @@ -1067,9 +1073,7 @@ class FoodSecurityAggregator: df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) df = add_yoy(df, ["framework"], "framework_score_1_100") - # performance_status - df["performance_status"] = df["framework_score_1_100"].apply(_performance_status) - + df["performance_status"] = df["framework_score_1_100"].apply(_performance_status) df["year"] = df["year"].astype(int) df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) @@ -1079,7 +1083,6 @@ class FoodSecurityAggregator: self._validate_mdgs_equals_total(df, level="asean") - # Log performance summary self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):") for fw in df["framework"].unique(): sub = df[df["framework"] == fw].sort_values("year") @@ -1137,8 +1140,7 @@ class FoodSecurityAggregator: score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) status_by_year = dict(zip(asean_total["year"].astype(int), asean_total["performance_status"].astype(str))) - - country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() + country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() records = [] @@ -1149,7 +1151,6 @@ class FoodSecurityAggregator: yoy = row["year_over_year_change"] yoy_val = float(yoy) if pd.notna(yoy) else None - # n_indicators per framework per year (dari lookup) n_mdg = self._count_framework_indicators(yr, "MDGs") n_sdg = self._count_framework_indicators(yr, "SDGs") n_total_ind = int( @@ -1160,8 +1161,7 @@ class FoodSecurityAggregator: prev_score = score_by_year.get(yr - 1, None) prev_status = status_by_year.get(yr - 1, "N/A") - - yoy_pct = ( + yoy_pct = ( (yoy_val / prev_score * 100) if (yoy_val is not None and prev_score is not None and prev_score != 0) else None @@ -1186,8 +1186,8 @@ class FoodSecurityAggregator: yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) if not yr_country_yoy.empty: - best_idx = yr_country_yoy["year_over_year_change"].idxmax() - worst_idx = yr_country_yoy["year_over_year_change"].idxmin() + best_idx = yr_country_yoy["year_over_year_change"].idxmax() + worst_idx = yr_country_yoy["year_over_year_change"].idxmin() most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) @@ -1301,8 +1301,8 @@ class FoodSecurityAggregator: yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) if not yr_pillars_yoy.empty: - best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() - worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() + best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() + worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) @@ -1419,63 +1419,112 @@ class FoodSecurityAggregator: status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}" self.logger.info(f" -> {status} (n_checked={len(check)})") + def _build_etl_metadata( + self, + table_name: str, + rows_loaded: int, + start_time: datetime, + end_time: datetime, + status: str, + error_msg: str = None, + ) -> dict: + """ + Susun dict metadata sesuai signature save_etl_metadata(client, metadata: dict) + dan kolom skema etl_metadata di bigquery_helpers.py: + source_class, table_name, execution_timestamp, duration_seconds, + rows_fetched, rows_transformed, rows_loaded, completeness_pct, + config_snapshot, validation_metrics + """ + duration = (end_time - start_time).total_seconds() if (start_time and end_time) else 0.0 + return { + "source_class" : "FoodSecurityAggregator", + "table_name" : table_name, + "execution_timestamp": start_time or end_time, + "duration_seconds" : round(duration, 4), + "rows_fetched" : rows_loaded, + "rows_transformed" : rows_loaded, + "rows_loaded" : rows_loaded, + "completeness_pct" : 100.0 if status == "success" else 0.0, + "config_snapshot" : json.dumps({ + "layer" : "gold", + "write_disposition" : "WRITE_TRUNCATE", + "normalize_frameworks_jointly": NORMALIZE_FRAMEWORKS_JOINTLY, + "performance_threshold" : PERFORMANCE_THRESHOLD, + "status" : status, + }), + "validation_metrics" : json.dumps({ + "status" : status, + "error_msg": error_msg or "", + }), + } + def _finalize(self, table_name: str, rows_loaded: int): """ - Tandai tabel sebagai sukses, catat ke etl_logs dan etl_metadata. - start_time diambil dari self.load_metadata yang di-set di awal tiap step. + Tandai tabel sukses. Catat ke etl_logs dan etl_metadata. + Pemanggilan: save_etl_metadata(client, metadata_dict) """ end_time = datetime.now() start_time = self.load_metadata[table_name].get("start_time") self.load_metadata[table_name].update({ "rows_loaded": rows_loaded, - "status": "success", - "end_time": end_time, + "status" : "success", + "end_time" : end_time, }) - # Catat ke etl_logs (ringkasan singkat) log_update(self.client, "DW", table_name, "full_load", rows_loaded) - # Catat ke etl_metadata (detail: durasi, status, rows) - save_etl_metadata( - client = self.client, - table_name = table_name, - layer = "gold", - rows_loaded= rows_loaded, - start_time = start_time, - end_time = end_time, - status = "success", - ) + try: + save_etl_metadata( + self.client, + self._build_etl_metadata( + table_name = table_name, + rows_loaded = rows_loaded, + start_time = start_time, + end_time = end_time, + status = "success", + ) + ) + except Exception as meta_err: + # Error metadata tidak boleh menghentikan pipeline + self.logger.warning( + f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}" + ) self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") def _fail(self, table_name: str, error: Exception): """ - Tandai tabel sebagai gagal, catat ke etl_logs dan etl_metadata beserta pesan error. + Tandai tabel gagal. Catat ke etl_logs dan etl_metadata. + Pemanggilan: save_etl_metadata(client, metadata_dict) """ end_time = datetime.now() start_time = self.load_metadata[table_name].get("start_time") error_msg = str(error) self.load_metadata[table_name].update({ - "status": "failed", + "status" : "failed", "end_time": end_time, }) - # Catat ke etl_logs log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg) - # Catat ke etl_metadata dengan status failed + pesan error - save_etl_metadata( - client = self.client, - table_name = table_name, - layer = "gold", - rows_loaded= 0, - start_time = start_time, - end_time = end_time, - status = "failed", - error_msg = error_msg, - ) + try: + save_etl_metadata( + self.client, + self._build_etl_metadata( + table_name = table_name, + rows_loaded = 0, + start_time = start_time, + end_time = end_time, + status = "failed", + error_msg = error_msg, + ) + ) + except Exception as meta_err: + self.logger.warning( + f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}" + ) self.logger.error(f" [FAIL] {table_name}: {error_msg}")