add metadata aggregate

This commit is contained in:
Debby
2026-04-16 08:14:10 +07:00
parent 74be63226a
commit 40528766bd

View File

@@ -252,7 +252,6 @@ def _build_overview_narrative(
trend_word = "improvement" if yoy_val >= 0 else "decline" trend_word = "improvement" if yoy_val >= 0 else "decline"
pct_clause = f", representing a {abs_pct:.2f}% {trend_word} year-over-year" pct_clause = f", representing a {abs_pct:.2f}% {trend_word} year-over-year"
# Note if performance status changed
status_change = "" status_change = ""
if prev_performance_status not in ("N/A", None) and prev_performance_status != performance_status: if prev_performance_status not in ("N/A", None) and prev_performance_status != performance_status:
status_change = ( status_change = (
@@ -444,7 +443,6 @@ class FoodSecurityAggregator:
self.sdgs_start_year = None self.sdgs_start_year = None
# Lookup: (indicator_id, year) -> framework label # Lookup: (indicator_id, year) -> framework label
# Dibangun di _assign_framework_labels(), dipakai di _count_framework_indicators()
self._ind_year_framework: pd.DataFrame = None self._ind_year_framework: pd.DataFrame = None
# ========================================================================= # =========================================================================
@@ -495,11 +493,9 @@ class FoodSecurityAggregator:
# ========================================================================= # =========================================================================
# STEP 1b: Detect sdgs_start_year + assign framework per (indicator, year) # STEP 1b: Detect sdgs_start_year + assign framework per (indicator, year)
# Konsisten dengan logika di bigquery_aggraget_fact_selected_layer.py
# ========================================================================= # =========================================================================
def _detect_sdgs_start_year(self) -> int: def _detect_sdgs_start_year(self) -> int:
"""Deteksi sdgs_start_year dari kehadiran FIES di data (metode eksplisit)."""
fies_rows = self.df[ fies_rows = self.df[
self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER) self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER)
] ]
@@ -508,7 +504,6 @@ class FoodSecurityAggregator:
self.logger.info(f" [FIES explicit] sdgs_start_year = {sdgs_start}") self.logger.info(f" [FIES explicit] sdgs_start_year = {sdgs_start}")
return sdgs_start return sdgs_start
# Fallback: gap terbesar pada distribusi min_year
ind_min_year = ( ind_min_year = (
self.df.groupby("indicator_id")["year"] self.df.groupby("indicator_id")["year"]
.min().reset_index().rename(columns={"year": "min_year"}) .min().reset_index().rename(columns={"year": "min_year"})
@@ -528,17 +523,6 @@ class FoodSecurityAggregator:
return int(y_after) return int(y_after)
def _assign_framework_labels(self): def _assign_framework_labels(self):
"""
Buat lookup table _ind_year_framework: DataFrame(indicator_id, year, framework).
Aturan (identik dengan IndicatorNormAggregator._assign_framework):
- Indikator TIDAK di SDG_ONLY_KEYWORDS -> selalu "MDGs"
- Indikator DI SDG_ONLY_KEYWORDS:
year < sdgs_start_year -> "MDGs"
year >= sdgs_start_year -> "SDGs"
Juga attach kolom 'framework' ke self.df untuk dipakai _get_norm_value_df().
"""
self.logger.info("\n" + "=" * 70) self.logger.info("\n" + "=" * 70)
self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS (per indicator per year)") self.logger.info("STEP 1b: ASSIGN FRAMEWORK LABELS (per indicator per year)")
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}") self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
@@ -552,20 +536,17 @@ class FoodSecurityAggregator:
df = df.drop(columns=["_is_sdg_kw"]) df = df.drop(columns=["_is_sdg_kw"])
self.df = df self.df = df
# Build compact lookup (unique indicator_id x year x framework)
self._ind_year_framework = ( self._ind_year_framework = (
self.df[["indicator_id", "year", "framework"]] self.df[["indicator_id", "year", "framework"]]
.drop_duplicates() .drop_duplicates()
.reset_index(drop=True) .reset_index(drop=True)
) )
# Log distribusi
fw_dist = self.df["framework"].value_counts() fw_dist = self.df["framework"].value_counts()
self.logger.info("\n Framework distribution (rows):") self.logger.info("\n Framework distribution (rows):")
for fw, cnt in fw_dist.items(): for fw, cnt in fw_dist.items():
self.logger.info(f" {fw:<6}: {cnt:,} rows") self.logger.info(f" {fw:<6}: {cnt:,} rows")
# n_indicators per framework per year
ind_fw_yr = ( ind_fw_yr = (
self._ind_year_framework self._ind_year_framework
.groupby(["year", "framework"])["indicator_id"] .groupby(["year", "framework"])["indicator_id"]
@@ -574,9 +555,7 @@ class FoodSecurityAggregator:
.rename(columns={"indicator_id": "n_indicators"}) .rename(columns={"indicator_id": "n_indicators"})
.sort_values(["year", "framework"]) .sort_values(["year", "framework"])
) )
self.logger.info( self.logger.info(f"\n {'Year':<6} {'Framework':<8} {'n_indicators'}")
f"\n {'Year':<6} {'Framework':<8} {'n_indicators'}"
)
self.logger.info(" " + "-" * 30) self.logger.info(" " + "-" * 30)
for _, r in ind_fw_yr.iterrows(): for _, r in ind_fw_yr.iterrows():
self.logger.info( self.logger.info(
@@ -584,10 +563,6 @@ class FoodSecurityAggregator:
) )
def _count_framework_indicators(self, year: int, framework: str) -> int: def _count_framework_indicators(self, year: int, framework: str) -> int:
"""
Hitung jumlah indikator unik untuk framework tertentu di tahun tertentu.
Menggunakan _ind_year_framework yang dibangun di _assign_framework_labels().
"""
mask = ( mask = (
(self._ind_year_framework["year"] == year) & (self._ind_year_framework["year"] == year) &
(self._ind_year_framework["framework"] == framework) (self._ind_year_framework["framework"] == framework)
@@ -633,6 +608,46 @@ class FoodSecurityAggregator:
return pd.concat(norm_parts, ignore_index=True) return pd.concat(norm_parts, ignore_index=True)
# =========================================================================
# METADATA BUILDER
# Menyesuaikan dengan signature: save_etl_metadata(client, metadata: dict)
# dan skema etl_metadata: source_class, table_name, execution_timestamp,
# duration_seconds, rows_fetched, rows_transformed, rows_loaded,
# completeness_pct, config_snapshot, validation_metrics
# =========================================================================
def _build_etl_metadata(
self,
table_name: str,
rows_loaded: int,
start_time: datetime,
end_time: datetime,
status: str,
error_msg: str = None,
) -> dict:
duration = (end_time - start_time).total_seconds() if (start_time and end_time) else 0.0
return {
"source_class" : "FoodSecurityAggregator",
"table_name" : table_name,
"execution_timestamp": start_time or end_time,
"duration_seconds" : round(duration, 4),
"rows_fetched" : rows_loaded,
"rows_transformed" : rows_loaded,
"rows_loaded" : rows_loaded,
"completeness_pct" : 100.0 if status == "success" else 0.0,
"config_snapshot" : json.dumps({
"layer" : "gold",
"write_disposition" : "WRITE_TRUNCATE",
"normalize_frameworks_jointly": NORMALIZE_FRAMEWORKS_JOINTLY,
"performance_threshold" : PERFORMANCE_THRESHOLD,
"status" : status,
}),
"validation_metrics" : json.dumps({
"status" : status,
"error_msg": error_msg or "",
}),
}
# ========================================================================= # =========================================================================
# STEP 2: agg_pillar_composite # STEP 2: agg_pillar_composite
# ========================================================================= # =========================================================================
@@ -963,9 +978,6 @@ class FoodSecurityAggregator:
parts = [] parts = []
# ------------------------------------------------------------------
# Helper: hitung n_indicators per framework per year dari lookup
# ------------------------------------------------------------------
def _n_ind(year_val, framework_val): def _n_ind(year_val, framework_val):
return self._count_framework_indicators(year_val, framework_val) return self._count_framework_indicators(year_val, framework_val)
@@ -995,9 +1007,7 @@ class FoodSecurityAggregator:
"asean_norm" : "framework_norm", "asean_norm" : "framework_norm",
"n_countries" : "n_countries_with_data", "n_countries" : "n_countries_with_data",
}) })
mdgs_pre["n_indicators"] = mdgs_pre["year"].apply( mdgs_pre["n_indicators"] = mdgs_pre["year"].apply(lambda y: _n_ind(y, "MDGs"))
lambda y: _n_ind(y, "MDGs")
)
mdgs_pre["framework"] = "MDGs" mdgs_pre["framework"] = "MDGs"
parts.append(mdgs_pre) parts.append(mdgs_pre)
@@ -1021,9 +1031,7 @@ class FoodSecurityAggregator:
std_norm =("country_norm", "std"), std_norm =("country_norm", "std"),
n_countries_with_data=("country_id", "count"), n_countries_with_data=("country_id", "count"),
).reset_index() ).reset_index()
asean_mdgs["n_indicators"] = asean_mdgs["year"].apply( asean_mdgs["n_indicators"] = asean_mdgs["year"].apply(lambda y: _n_ind(y, "MDGs"))
lambda y: _n_ind(y, "MDGs")
)
if not NORMALIZE_FRAMEWORKS_JOINTLY: if not NORMALIZE_FRAMEWORKS_JOINTLY:
asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"])
asean_mdgs["framework"] = "MDGs" asean_mdgs["framework"] = "MDGs"
@@ -1049,9 +1057,7 @@ class FoodSecurityAggregator:
std_norm =("country_norm", "std"), std_norm =("country_norm", "std"),
n_countries_with_data=("country_id", "count"), n_countries_with_data=("country_id", "count"),
).reset_index() ).reset_index()
asean_sdgs["n_indicators"] = asean_sdgs["year"].apply( asean_sdgs["n_indicators"] = asean_sdgs["year"].apply(lambda y: _n_ind(y, "SDGs"))
lambda y: _n_ind(y, "SDGs")
)
if not NORMALIZE_FRAMEWORKS_JOINTLY: if not NORMALIZE_FRAMEWORKS_JOINTLY:
asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"])
asean_sdgs["framework"] = "SDGs" asean_sdgs["framework"] = "SDGs"
@@ -1067,9 +1073,7 @@ class FoodSecurityAggregator:
df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger)
df = add_yoy(df, ["framework"], "framework_score_1_100") df = add_yoy(df, ["framework"], "framework_score_1_100")
# performance_status
df["performance_status"] = df["framework_score_1_100"].apply(_performance_status) df["performance_status"] = df["framework_score_1_100"].apply(_performance_status)
df["year"] = df["year"].astype(int) df["year"] = df["year"].astype(int)
df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger)
df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger)
@@ -1079,7 +1083,6 @@ class FoodSecurityAggregator:
self._validate_mdgs_equals_total(df, level="asean") self._validate_mdgs_equals_total(df, level="asean")
# Log performance summary
self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):") self.logger.info(f"\n performance_status summary (threshold={PERFORMANCE_THRESHOLD}):")
for fw in df["framework"].unique(): for fw in df["framework"].unique():
sub = df[df["framework"] == fw].sort_values("year") sub = df[df["framework"] == fw].sort_values("year")
@@ -1137,7 +1140,6 @@ class FoodSecurityAggregator:
score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float)))
status_by_year = dict(zip(asean_total["year"].astype(int), asean_total["performance_status"].astype(str))) status_by_year = dict(zip(asean_total["year"].astype(int), asean_total["performance_status"].astype(str)))
country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy()
records = [] records = []
@@ -1149,7 +1151,6 @@ class FoodSecurityAggregator:
yoy = row["year_over_year_change"] yoy = row["year_over_year_change"]
yoy_val = float(yoy) if pd.notna(yoy) else None yoy_val = float(yoy) if pd.notna(yoy) else None
# n_indicators per framework per year (dari lookup)
n_mdg = self._count_framework_indicators(yr, "MDGs") n_mdg = self._count_framework_indicators(yr, "MDGs")
n_sdg = self._count_framework_indicators(yr, "SDGs") n_sdg = self._count_framework_indicators(yr, "SDGs")
n_total_ind = int( n_total_ind = int(
@@ -1160,7 +1161,6 @@ class FoodSecurityAggregator:
prev_score = score_by_year.get(yr - 1, None) prev_score = score_by_year.get(yr - 1, None)
prev_status = status_by_year.get(yr - 1, "N/A") prev_status = status_by_year.get(yr - 1, "N/A")
yoy_pct = ( yoy_pct = (
(yoy_val / prev_score * 100) (yoy_val / prev_score * 100)
if (yoy_val is not None and prev_score is not None and prev_score != 0) if (yoy_val is not None and prev_score is not None and prev_score != 0)
@@ -1419,10 +1419,49 @@ class FoodSecurityAggregator:
status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}" status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}"
self.logger.info(f" -> {status} (n_checked={len(check)})") self.logger.info(f" -> {status} (n_checked={len(check)})")
def _build_etl_metadata(
self,
table_name: str,
rows_loaded: int,
start_time: datetime,
end_time: datetime,
status: str,
error_msg: str = None,
) -> dict:
"""
Susun dict metadata sesuai signature save_etl_metadata(client, metadata: dict)
dan kolom skema etl_metadata di bigquery_helpers.py:
source_class, table_name, execution_timestamp, duration_seconds,
rows_fetched, rows_transformed, rows_loaded, completeness_pct,
config_snapshot, validation_metrics
"""
duration = (end_time - start_time).total_seconds() if (start_time and end_time) else 0.0
return {
"source_class" : "FoodSecurityAggregator",
"table_name" : table_name,
"execution_timestamp": start_time or end_time,
"duration_seconds" : round(duration, 4),
"rows_fetched" : rows_loaded,
"rows_transformed" : rows_loaded,
"rows_loaded" : rows_loaded,
"completeness_pct" : 100.0 if status == "success" else 0.0,
"config_snapshot" : json.dumps({
"layer" : "gold",
"write_disposition" : "WRITE_TRUNCATE",
"normalize_frameworks_jointly": NORMALIZE_FRAMEWORKS_JOINTLY,
"performance_threshold" : PERFORMANCE_THRESHOLD,
"status" : status,
}),
"validation_metrics" : json.dumps({
"status" : status,
"error_msg": error_msg or "",
}),
}
def _finalize(self, table_name: str, rows_loaded: int): def _finalize(self, table_name: str, rows_loaded: int):
""" """
Tandai tabel sebagai sukses, catat ke etl_logs dan etl_metadata. Tandai tabel sukses. Catat ke etl_logs dan etl_metadata.
start_time diambil dari self.load_metadata yang di-set di awal tiap step. Pemanggilan: save_etl_metadata(client, metadata_dict)
""" """
end_time = datetime.now() end_time = datetime.now()
start_time = self.load_metadata[table_name].get("start_time") start_time = self.load_metadata[table_name].get("start_time")
@@ -1433,25 +1472,31 @@ class FoodSecurityAggregator:
"end_time" : end_time, "end_time" : end_time,
}) })
# Catat ke etl_logs (ringkasan singkat)
log_update(self.client, "DW", table_name, "full_load", rows_loaded) log_update(self.client, "DW", table_name, "full_load", rows_loaded)
# Catat ke etl_metadata (detail: durasi, status, rows) try:
save_etl_metadata( save_etl_metadata(
client = self.client, self.client,
self._build_etl_metadata(
table_name = table_name, table_name = table_name,
layer = "gold",
rows_loaded = rows_loaded, rows_loaded = rows_loaded,
start_time = start_time, start_time = start_time,
end_time = end_time, end_time = end_time,
status = "success", status = "success",
) )
)
except Exception as meta_err:
# Error metadata tidak boleh menghentikan pipeline
self.logger.warning(
f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}"
)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
def _fail(self, table_name: str, error: Exception): def _fail(self, table_name: str, error: Exception):
""" """
Tandai tabel sebagai gagal, catat ke etl_logs dan etl_metadata beserta pesan error. Tandai tabel gagal. Catat ke etl_logs dan etl_metadata.
Pemanggilan: save_etl_metadata(client, metadata_dict)
""" """
end_time = datetime.now() end_time = datetime.now()
start_time = self.load_metadata[table_name].get("start_time") start_time = self.load_metadata[table_name].get("start_time")
@@ -1462,20 +1507,24 @@ class FoodSecurityAggregator:
"end_time": end_time, "end_time": end_time,
}) })
# Catat ke etl_logs
log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg) log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg)
# Catat ke etl_metadata dengan status failed + pesan error try:
save_etl_metadata( save_etl_metadata(
client = self.client, self.client,
self._build_etl_metadata(
table_name = table_name, table_name = table_name,
layer = "gold",
rows_loaded = 0, rows_loaded = 0,
start_time = start_time, start_time = start_time,
end_time = end_time, end_time = end_time,
status = "failed", status = "failed",
error_msg = error_msg, error_msg = error_msg,
) )
)
except Exception as meta_err:
self.logger.warning(
f" [METADATA WARNING] Gagal simpan etl_metadata untuk {table_name}: {meta_err}"
)
self.logger.error(f" [FAIL] {table_name}: {error_msg}") self.logger.error(f" [FAIL] {table_name}: {error_msg}")