From 60302689245141b6955fbb313a62de09675153b8 Mon Sep 17 00:00:00 2001 From: Debby Date: Thu, 2 Apr 2026 17:43:31 +0700 Subject: [PATCH] salah file --- scripts/bigquery_aggregate_layer.py | 312 +++-- scripts/bigquery_cleaned_layer.py | 1814 ++++++++------------------- 2 files changed, 719 insertions(+), 1407 deletions(-) diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index e66f2cb..9cbc343 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -2,19 +2,32 @@ BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION Semua agregasi pakai norm_value dari _get_norm_value_df() -UPDATED: -- _classify_indicators() membaca kolom 'framework' langsung dari - fact_asean_food_security_selected (sudah di-assign di analytical_layer - berdasarkan SDG_INDICATOR_KEYWORDS + actual_start_year). -- Kolom 'condition' (good/moderate/bad) ditambahkan ke semua tabel agregasi: - * agg_pillar_composite - * agg_pillar_by_country - * agg_framework_by_country - * agg_framework_asean - Threshold fixed absolute (skala 1-100, direction-aware): - bad : score < 40 - moderate : 40 <= score <= 60 - good : score > 60 +PERBAIKAN (vs versi sebelumnya): +───────────────────────────────────────────────────────────────────────────── +1. NORMALIZE_FRAMEWORKS_JOINTLY dihapus. + Setelah perbaikan di analytical_layer, norm_value_1_100 sudah dihitung + SEKALI per indikator dari seluruh data (semua tahun, semua negara). + Tidak ada lagi rescaling ulang per-framework di layer ini. + Semua framework (MDGs, SDGs, Total) menggunakan norm_value yang SAMA + sebagai basis, sehingga skor mereka berada pada skala yang setara. + +2. _get_norm_value_df() DISEDERHANAKAN. + Fungsi ini sekarang hanya membaca kolom norm_value_1_100 yang sudah ada + di fact_asean_food_security_selected (hasil dari analytical_layer), + kemudian memetakan ke skala 0-1 untuk keperluan agregasi internal. + TIDAK ada lagi normalisasi ulang per indikator di sini. + +3. global_minmax() TETAP DIGUNAKAN untuk mengubah rata-rata norm (0-1) menjadi + skor 1-100 di level agregasi (pillar / country / asean). + Ini adalah rescaling level AGREGAT (bukan level indikator), sehingga masih + valid dan tidak menimbulkan bias komparabilitas. + +4. Framework MDGs dan SDGs sekarang comparable: + - Jika skor SDGs < skor MDGs → memang karena indikator SDGs mengukur + dimensi deprivasi yang lebih dalam (substantif), bukan artefak teknis. + - Log diagnostik ditambahkan untuk memverifikasi ini. + +5. Kolom 'condition' (good/moderate/bad) TETAP dengan threshold yang sama. Simpan 6 tabel ke fs_asean_gold (layer='gold'): - agg_pillar_composite @@ -57,10 +70,7 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({ "positive", "higher_better", "higher_is_better", }) -NORMALIZE_FRAMEWORKS_JOINTLY = False - # Threshold kondisi — fixed absolute, skala 1-100 -# Konsisten dengan THRESHOLD_BAD / THRESHOLD_GOOD di analytical_layer THRESHOLD_BAD = 40.0 THRESHOLD_GOOD = 60.0 @@ -118,6 +128,11 @@ def _should_invert(direction: str, logger=None, context: str = "") -> bool: def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series: + """ + Rescale series ke rentang [lo, hi]. + Digunakan untuk mengubah norm agregat (0-1) menjadi skor 1-100 di level + pillar / country / asean. Bukan untuk normalisasi indikator mentah. + """ values = series.dropna().values if len(values) == 0: return pd.Series(np.nan, index=series.index) @@ -172,16 +187,11 @@ def check_and_dedup( def add_condition_column(df: pd.DataFrame, score_col: str) -> pd.DataFrame: - """ - Tambahkan kolom 'condition' berdasarkan score_col. - Threshold: bad < 40, moderate 40-60, good > 60 (skala 1-100). - """ df['condition'] = df[score_col].apply(assign_condition) return df def log_condition_summary(df: pd.DataFrame, context: str, logger) -> None: - """Log distribusi kondisi untuk verifikasi.""" dist = df['condition'].value_counts() logger.info( f" Condition distribution ({context}): " + @@ -190,7 +200,7 @@ def log_condition_summary(df: pd.DataFrame, context: str, logger) -> None: # ============================================================================= -# NARRATIVE BUILDER FUNCTIONS +# NARRATIVE BUILDER FUNCTIONS (tidak berubah) # ============================================================================= def _fmt_score(score) -> str: @@ -426,6 +436,8 @@ class FoodSecurityAggregator: "indicator_id", "indicator_name", "direction", "framework", "pillar_id", "pillar_name", "time_id", "year", "value", + # PERBAIKAN: norm_value_1_100 wajib ada (hasil analytical_layer) + "norm_value_1_100", } missing_cols = required_cols - set(self.df.columns) if missing_cols: @@ -434,12 +446,13 @@ class FoodSecurityAggregator: f"Pastikan pipeline dijalankan berurutan:\n" f" 1. bigquery_cleaned_layer.py\n" f" 2. bigquery_dimensional_model.py\n" - f" 3. bigquery_analytical_layer.py\n" + f" 3. bigquery_analytical_layer.py ← harus dijalankan dulu\n" f" 4. bigquery_analysis_layer.py (file ini)" ) - self.df["direction"] = self.df["direction"].fillna("positive") - self.df["framework"] = self.df["framework"].fillna("MDGs") + self.df["direction"] = self.df["direction"].fillna("positive") + self.df["framework"] = self.df["framework"].fillna("MDGs") + self.df["norm_value_1_100"] = self.df["norm_value_1_100"].astype(float) dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() self.logger.info(f"\n Distribusi direction per indikator:") @@ -458,6 +471,45 @@ class FoodSecurityAggregator: f"Tahun: {int(self.df['year'].min())}-{int(self.df['year'].max())}" ) + # Diagnostik: cek komparabilitas norm antar framework + self._log_norm_comparability_diagnostics() + + def _log_norm_comparability_diagnostics(self): + """ + Log diagnostik untuk memverifikasi bahwa norm_value_1_100 sudah comparable + antar framework setelah perbaikan di analytical_layer. + """ + self.logger.info(f"\n [DIAGNOSTIK] Komparabilitas norm_value_1_100 antar framework:") + self.logger.info(f" {'─'*60}") + + fw_stats = ( + self.df.groupby('framework')['norm_value_1_100'] + .agg(['mean', 'median', 'std', 'min', 'max']) + .round(2) + ) + for fw, row in fw_stats.iterrows(): + self.logger.info( + f" {fw:<8} mean={row['mean']:>6.2f} median={row['median']:>6.2f} " + f"std={row['std']:>5.2f} range=[{row['min']:.2f},{row['max']:.2f}]" + ) + + mdgs_mean = self.df[self.df['framework'] == 'MDGs']['norm_value_1_100'].mean() + sdgs_mean = self.df[self.df['framework'] == 'SDGs']['norm_value_1_100'].mean() + gap = mdgs_mean - sdgs_mean + + if abs(gap) > 15: + self.logger.info( + f"\n [INFO] Gap MDGs-SDGs = {gap:.2f} poin." + f"\n Ini adalah perbedaan SUBSTANTIF (bukan artefak normalisasi):" + f"\n Indikator SDGs mengukur deprivasi yang lebih dalam" + f"\n (FIES, stunting, wasting, anaemia) vs indikator MDGs." + f"\n Gap ini valid untuk dilaporkan sebagai temuan analisis." + ) + else: + self.logger.info( + f"\n [OK] Gap MDGs-SDGs = {gap:.2f} poin — dalam batas wajar." + ) + # ========================================================================= # STEP 1b: Klasifikasi indikator # ========================================================================= @@ -474,8 +526,6 @@ class FoodSecurityAggregator: self.df[self.df["framework"] == "SDGs"]["indicator_id"].unique().tolist() ) - # sdgs_start_year: ambil dari proxy SDGs-only (FIES/anaemia) - # Konsisten dengan cara analytical_layer mendeteksinya _PROXY_KW = frozenset(['food insecurity', 'anemia', 'anaemia']) proxy_mask = ( (self.df["framework"] == "SDGs") & @@ -492,7 +542,6 @@ class FoodSecurityAggregator: f"(dari proxy FIES/anaemia di tabel)" ) else: - # Fallback: min year dari semua SDGs rows sdgs_rows = self.df[self.df["framework"] == "SDGs"] if not sdgs_rows.empty: self.sdgs_start_year = int(sdgs_rows["year"].min()) @@ -520,39 +569,48 @@ class FoodSecurityAggregator: self.logger.info(f" [{int(row['indicator_id'])}] {row['indicator_name']}") # ========================================================================= - # CORE HELPER: normalisasi 0-1 per indikator (untuk composite score) + # CORE HELPER: _get_norm_value_df() + # ========================================================================= + # PERBAIKAN: + # Fungsi ini TIDAK lagi melakukan normalisasi ulang per indikator. + # Kolom norm_value_1_100 sudah dihitung sekali di analytical_layer + # dengan referensi global (semua tahun, semua negara, per indikator). + # + # Yang dilakukan di sini hanya: + # 1. Membaca norm_value_1_100 dari df + # 2. Mengubah skala 1-100 → 0-1 (untuk keperluan rata-rata agregat) + # dengan rumus linear: norm_0_1 = (norm_1_100 - 1) / 99 + # + # Rescaling agregat (0-1 → 1-100) tetap dilakukan via global_minmax() + # di masing-masing fungsi calc_* untuk menghasilkan skor level pillar/country/asean. # ========================================================================= def _get_norm_value_df(self) -> pd.DataFrame: - norm_parts = [] - for ind_id, grp in self.df.groupby("indicator_id"): - grp = grp.copy() - direction = str(grp["direction"].iloc[0]) - do_invert = _should_invert(direction, self.logger, context=f"indicator_id={ind_id}") - valid_mask = grp["value"].notna() - n_valid = valid_mask.sum() + """ + Mengembalikan df dengan kolom 'norm_value' (skala 0-1) yang diturunkan + dari norm_value_1_100 (sudah ada di source, dihitung di analytical_layer). - if n_valid < 2: - grp["norm_value"] = np.nan - norm_parts.append(grp) - continue + Transformasi: norm_value = (norm_value_1_100 - 1) / 99 + Ini adalah transformasi LINEAR — tidak mengubah urutan relatif antar indikator, + negara, atau tahun. Komparabilitas lintas framework tetap terjaga. + """ + df = self.df.copy() - raw = grp.loc[valid_mask, "value"].values - v_min, v_max = raw.min(), raw.max() - normed = np.full(len(grp), np.nan) + # Konversi 1-100 → 0-1 secara linear + df["norm_value"] = np.where( + df["norm_value_1_100"].notna(), + (df["norm_value_1_100"] - 1.0) / 99.0, + np.nan + ) - if v_min == v_max: - normed[valid_mask.values] = 0.5 - else: - normed[valid_mask.values] = (raw - v_min) / (v_max - v_min) + n_null = df["norm_value"].isna().sum() + n_valid = df["norm_value"].notna().sum() + self.logger.debug( + f" _get_norm_value_df: {n_valid:,} valid | {n_null:,} null " + f"(dari norm_value_1_100 analytical_layer)" + ) - if do_invert: - normed = np.where(np.isnan(normed), np.nan, 1.0 - normed) - - grp["norm_value"] = normed - norm_parts.append(grp) - - return pd.concat(norm_parts, ignore_index=True) + return df # ========================================================================= # STEP 2: agg_pillar_composite @@ -674,6 +732,16 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 4: agg_framework_by_country # ========================================================================= + # PERBAIKAN: + # - Flag NORMALIZE_FRAMEWORKS_JOINTLY dihapus. + # - Tidak ada lagi rescaling ulang per-framework di sini. + # - Semua framework (Total, MDGs, SDGs) menggunakan norm_value yang SAMA + # sebagai basis (sudah comparable dari analytical_layer). + # - global_minmax() hanya digunakan SEKALI untuk mengubah norm agregat + # (rata-rata norm_value per country-framework-year) menjadi skor 1-100 + # di level country-framework, menggunakan SATU POOL DATA BERSAMA. + # - Dengan ini, perbandingan skor MDGs vs SDGs per negara adalah valid. + # ========================================================================= def _calc_country_composite_inmemory(self) -> pd.DataFrame: df_normed = self._get_norm_value_df() @@ -707,12 +775,16 @@ class FoodSecurityAggregator: self.logger.info("\n" + "=" * 70) self.logger.info(f"STEP 4: {table_name}") self.logger.info("=" * 70) + self.logger.info( + " [PERBAIKAN] Semua framework di-aggregate dari norm_value yang SAMA." + "\n Tidak ada rescaling per-framework. Skor MDGs dan SDGs comparable." + ) country_composite = self._calc_country_composite_inmemory() df_normed = self._get_norm_value_df() parts = [] - # Layer TOTAL + # ── Layer TOTAL ─────────────────────────────────────────────────────── agg_total = ( country_composite[[ "country_id", "country_name", "year", @@ -727,8 +799,10 @@ class FoodSecurityAggregator: agg_total["framework"] = "Total" parts.append(agg_total) - # Layer MDGs pre-SDGs - pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() + # ── Layer MDGs pre-SDGs (tahun sebelum sdgs_start_year) ────────────── + pre_sdgs_rows = country_composite[ + country_composite["year"] < self.sdgs_start_year + ].copy() if not pre_sdgs_rows.empty: mdgs_pre = ( pre_sdgs_rows[[ @@ -744,7 +818,7 @@ class FoodSecurityAggregator: mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # Layer MDGs mixed (setelah SDGs masuk) + # ── Layer MDGs mixed (setelah SDGs masuk, hanya indikator MDGs) ────── if self.mdgs_indicator_ids: df_mdgs_mixed = df_normed[ (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & @@ -754,15 +828,17 @@ class FoodSecurityAggregator: agg_mdgs_mixed = ( df_mdgs_mixed .groupby(["country_id", "country_name", "year"]) - .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique") + ) .reset_index() ) - if not NORMALIZE_FRAMEWORKS_JOINTLY: - agg_mdgs_mixed["framework_score_1_100"] = global_minmax(agg_mdgs_mixed["framework_norm"]) + # PERBAIKAN: rescale dari POOL GABUNGAN bersama SDGs (lihat bawah) agg_mdgs_mixed["framework"] = "MDGs" parts.append(agg_mdgs_mixed) - # Layer SDGs + # ── Layer SDGs (hanya indikator SDGs, mulai sdgs_start_year) ───────── if self.sdgs_indicator_ids: df_sdgs = df_normed[ (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & @@ -772,22 +848,40 @@ class FoodSecurityAggregator: agg_sdgs = ( df_sdgs .groupby(["country_id", "country_name", "year"]) - .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique") + ) .reset_index() ) - if not NORMALIZE_FRAMEWORKS_JOINTLY: - agg_sdgs["framework_score_1_100"] = global_minmax(agg_sdgs["framework_norm"]) agg_sdgs["framework"] = "SDGs" parts.append(agg_sdgs) df = pd.concat(parts, ignore_index=True) - if NORMALIZE_FRAMEWORKS_JOINTLY: - mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) - if mixed_mask.any(): - df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + # PERBAIKAN: Rescale framework_score_1_100 dari SATU POOL BERSAMA + # untuk semua framework (MDGs mixed + SDGs) sekaligus. + # Ini memastikan skor 60 di MDGs dan skor 60 di SDGs memiliki makna + # yang sama: posisi relatif yang sama dalam distribusi gabungan. + mixed_mask = df["framework"].isin(["MDGs", "SDGs"]) + mixed_pre_mask = (df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year) + + # Rescale pre-SDGs MDGs dari pool Total (sudah dihitung) + # → sudah ada di agg_total (framework_score_1_100 = dari country_composite) + + # Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA + post_sdgs_mask = mixed_mask & ~mixed_pre_mask & df["framework_norm"].notna() + if post_sdgs_mask.any(): + df.loc[post_sdgs_mask, "framework_score_1_100"] = global_minmax( + df.loc[post_sdgs_mask, "framework_norm"] + ) df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) + + # Pastikan kolom framework_score_1_100 ada untuk semua baris + if "framework_score_1_100" not in df.columns: + df["framework_score_1_100"] = np.nan + df["rank_in_framework_year"] = ( df.groupby(["framework", "year"])["framework_score_1_100"] .rank(method="min", ascending=False) @@ -797,6 +891,9 @@ class FoodSecurityAggregator: df = add_condition_column(df, "framework_score_1_100") log_condition_summary(df, table_name, self.logger) + # Log diagnostik: bandingkan skor MDGs vs SDGs + self._log_framework_score_diagnostics(df, table_name) + df["country_id"] = df["country_id"].astype(int) df["year"] = df["year"].astype(int) df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) @@ -828,6 +925,9 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 5: agg_framework_asean # ========================================================================= + # PERBAIKAN: Sama dengan framework_by_country — tidak ada rescaling terpisah + # per framework. MDGs mixed dan SDGs di-rescale dari satu pool bersama. + # ========================================================================= def calc_framework_asean(self) -> pd.DataFrame: table_name = "agg_framework_asean" @@ -835,6 +935,10 @@ class FoodSecurityAggregator: self.logger.info("\n" + "=" * 70) self.logger.info(f"STEP 5: {table_name}") self.logger.info("=" * 70) + self.logger.info( + " [PERBAIKAN] MDGs mixed + SDGs di-rescale dari SATU POOL BERSAMA." + "\n Skor ASEAN MDGs dan SDGs sekarang comparable." + ) df_normed = self._get_norm_value_df() country_composite = self._calc_country_composite_inmemory() @@ -847,14 +951,18 @@ class FoodSecurityAggregator: ) asean_overall = ( country_norm.groupby("year") - .agg(asean_norm=("country_norm", "mean"), std_norm=("country_norm", "std"), n_countries=("country_norm", "count")) + .agg( + asean_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries =("country_norm", "count") + ) .reset_index() ) asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) parts = [] - # Layer TOTAL + # ── Layer TOTAL ─────────────────────────────────────────────────────── total_cols = asean_overall[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() total_cols = total_cols.rename(columns={ "asean_score_1_100": "framework_score_1_100", @@ -866,7 +974,7 @@ class FoodSecurityAggregator: total_cols["framework"] = "Total" parts.append(total_cols) - # Layer MDGs pre-SDGs + # ── Layer MDGs pre-SDGs ─────────────────────────────────────────────── pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() if not pre_sdgs.empty: mdgs_pre = pre_sdgs[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() @@ -884,7 +992,9 @@ class FoodSecurityAggregator: mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # Layer MDGs mixed + # ── Siapkan MDGs mixed dan SDGs untuk rescaling BERSAMA ─────────────── + mixed_parts = [] + if self.mdgs_indicator_ids: df_mdgs_mixed = df_normed[ (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & @@ -902,12 +1012,9 @@ class FoodSecurityAggregator: ).reset_index() n_ind_mdgs = df_mdgs_mixed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) asean_mdgs = asean_mdgs.merge(n_ind_mdgs, on="year", how="left") - if not NORMALIZE_FRAMEWORKS_JOINTLY: - asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) asean_mdgs["framework"] = "MDGs" - parts.append(asean_mdgs) + mixed_parts.append(asean_mdgs) - # Layer SDGs if self.sdgs_indicator_ids: df_sdgs = df_normed[ (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & @@ -925,23 +1032,25 @@ class FoodSecurityAggregator: ).reset_index() n_ind_sdgs = df_sdgs.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) asean_sdgs = asean_sdgs.merge(n_ind_sdgs, on="year", how="left") - if not NORMALIZE_FRAMEWORKS_JOINTLY: - asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) asean_sdgs["framework"] = "SDGs" - parts.append(asean_sdgs) + mixed_parts.append(asean_sdgs) + + # PERBAIKAN: Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA + if mixed_parts: + df_mixed = pd.concat(mixed_parts, ignore_index=True) + df_mixed["framework_score_1_100"] = global_minmax(df_mixed["framework_norm"]) + parts.append(df_mixed) df = pd.concat(parts, ignore_index=True) - if NORMALIZE_FRAMEWORKS_JOINTLY: - mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) - if mixed_mask.any(): - df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) - df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) df = add_yoy(df, ["framework"], "framework_score_1_100") df = add_condition_column(df, "framework_score_1_100") log_condition_summary(df, table_name, self.logger) + # Log diagnostik: bandingkan skor ASEAN MDGs vs SDGs + self._log_framework_score_diagnostics(df, table_name) + df["year"] = df["year"].astype(int) df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) @@ -1164,9 +1273,34 @@ class FoodSecurityAggregator: return df # ========================================================================= - # HELPERS + # DIAGNOSTIK & VALIDASI # ========================================================================= + def _log_framework_score_diagnostics(self, df: pd.DataFrame, context: str): + """ + Log perbandingan rata-rata skor per framework. + Setelah perbaikan, gap antar framework mencerminkan perbedaan substantif, + bukan artefak normalisasi. + """ + self.logger.info(f"\n [DIAGNOSTIK] Rata-rata skor per framework ({context}):") + fw_means = df.groupby("framework")["framework_score_1_100"].agg(['mean', 'min', 'max']).round(2) + for fw, row in fw_means.iterrows(): + self.logger.info( + f" {fw:<8} mean={row['mean']:>6.2f} " + f"range=[{row['min']:.2f}, {row['max']:.2f}]" + ) + + if "MDGs" in fw_means.index and "SDGs" in fw_means.index: + gap = fw_means.loc["MDGs", "mean"] - fw_means.loc["SDGs", "mean"] + self.logger.info( + f"\n Gap MDGs-SDGs = {gap:.2f} poin" + + ( + " → SUBSTANTIF (indikator SDGs mengukur deprivasi lebih dalam)" + if abs(gap) > 10 else + " → dalam batas wajar" + ) + ) + def _validate_mdgs_equals_total(self, df: pd.DataFrame, level: str = ""): self.logger.info(f"\n Validasi MDGs < {self.sdgs_start_year} == Total [{level}]:") group_by = ["year"] if level.startswith("asean") else ["country_id", "year"] @@ -1202,6 +1336,10 @@ class FoodSecurityAggregator: self.logger.info("\n" + "=" * 70) self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold") self.logger.info(f" Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") + self.logger.info( + " NORMALISASI: norm_value dari analytical_layer (satu referensi global)." + "\n Tidak ada rescaling per-framework. MDGs dan SDGs comparable." + ) self.logger.info("=" * 70) self.load_data() @@ -1250,6 +1388,8 @@ if __name__ == "__main__": print("=" * 70) print("FOOD SECURITY AGGREGATION -> fs_asean_gold") print(f"Condition threshold: bad<{THRESHOLD_BAD}, moderate {THRESHOLD_BAD}-{THRESHOLD_GOOD}, good>{THRESHOLD_GOOD}") + print("NORMALISASI: satu referensi global per indikator (dari analytical_layer).") + print("Tidak ada rescaling per-framework. MDGs dan SDGs comparable.") print("=" * 70) logger = setup_logging() diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index 9cbc343..682035e 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -1,53 +1,27 @@ """ -BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION -Semua agregasi pakai norm_value dari _get_norm_value_df() +BIGQUERY CLEANED LAYER ETL +Kimball Data Warehouse Architecture -PERBAIKAN (vs versi sebelumnya): -───────────────────────────────────────────────────────────────────────────── -1. NORMALIZE_FRAMEWORKS_JOINTLY dihapus. - Setelah perbaikan di analytical_layer, norm_value_1_100 sudah dihitung - SEKALI per indikator dari seluruh data (semua tahun, semua negara). - Tidak ada lagi rescaling ulang per-framework di layer ini. - Semua framework (MDGs, SDGs, Total) menggunakan norm_value yang SAMA - sebagai basis, sehingga skor mereka berada pada skala yang setara. +Kimball ETL Flow yang dijalankan file ini: + Input : STAGING layer (Silver) — staging_integrated (fs_asean_silver) + Output : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver) + Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit) -2. _get_norm_value_df() DISEDERHANAKAN. - Fungsi ini sekarang hanya membaca kolom norm_value_1_100 yang sudah ada - di fact_asean_food_security_selected (hasil dari analytical_layer), - kemudian memetakan ke skala 0-1 untuk keperluan agregasi internal. - TIDAK ada lagi normalisasi ulang per indikator di sini. +Classes: + CleanedDataLoader — Cleaning, enrichment, & load ke Silver layer -3. global_minmax() TETAP DIGUNAKAN untuk mengubah rata-rata norm (0-1) menjadi - skor 1-100 di level agregasi (pillar / country / asean). - Ini adalah rescaling level AGREGAT (bukan level indikator), sehingga masih - valid dan tidak menimbulkan bias komparabilitas. - -4. Framework MDGs dan SDGs sekarang comparable: - - Jika skor SDGs < skor MDGs → memang karena indikator SDGs mengukur - dimensi deprivasi yang lebih dalam (substantif), bukan artefak teknis. - - Log diagnostik ditambahkan untuk memverifikasi ini. - -5. Kolom 'condition' (good/moderate/bad) TETAP dengan threshold yang sama. - -Simpan 6 tabel ke fs_asean_gold (layer='gold'): - - agg_pillar_composite - - agg_pillar_by_country - - agg_framework_by_country - - agg_framework_asean - - agg_narrative_overview - - agg_narrative_pillar - -SOURCE TABLE: fact_asean_food_security_selected +Usage: + python bigquery_cleaned_layer.py """ import pandas as pd import numpy as np from datetime import datetime import logging +from typing import Dict import json -import sys as _sys -from scripts.bigquery_config import get_bigquery_client +from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id from scripts.bigquery_helpers import ( log_update, load_to_bigquery, @@ -59,1347 +33,545 @@ from google.cloud import bigquery # ============================================================================= -# KONSTANTA GLOBAL +# LOAD STAGING DATA # ============================================================================= -DIRECTION_INVERT_KEYWORDS = frozenset({ - "negative", "lower_better", "lower_is_better", "inverse", "neg", -}) - -DIRECTION_POSITIVE_KEYWORDS = frozenset({ - "positive", "higher_better", "higher_is_better", -}) - -# Threshold kondisi — fixed absolute, skala 1-100 -THRESHOLD_BAD = 40.0 -THRESHOLD_GOOD = 60.0 +def load_staging_data(client: bigquery.Client) -> pd.DataFrame: + """Load data dari staging_integrated (STAGING/Silver layer).""" + print("\nLoading data from staging_integrated (fs_asean_silver)...") + df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver') + print(f" Loaded : {len(df_staging):,} rows") + print(f" Columns : {len(df_staging.columns)}") + print(f" Sources : {df_staging['source'].nunique()}") + print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") + print(f" Countries : {df_staging['country'].nunique()}") + print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") + return df_staging -def assign_condition(score) -> str: +# ============================================================================= +# COLUMN CONSTRAINT HELPERS +# ============================================================================= + +COLUMN_CONSTRAINTS = { + 'source' : 20, + 'indicator_original' : 255, + 'indicator_standardized': 255, + 'country' : 100, + 'year_range' : 20, + 'unit' : 20, + 'pillar' : 20, + 'direction' : 15, +} + + +def truncate_string(value, max_length: int) -> str: + """Truncate string ke max_length, return as-is jika None/NaN.""" + if pd.isna(value): + return value + value_str = str(value) + return value_str[:max_length] if len(value_str) > max_length else value_str + + +def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame: """ - Assign kondisi berdasarkan score skala 1-100 (direction-aware, nilai tinggi = lebih baik). - Returns: 'good' / 'moderate' / 'bad' / None jika NaN + Apply column length constraints sesuai schema tabel. + Melaporkan kolom mana yang dipotong dan contohnya. """ - if score is None or (isinstance(score, float) and np.isnan(score)): - return None - if score > THRESHOLD_GOOD: - return 'good' - if score < THRESHOLD_BAD: - return 'bad' - return 'moderate' + df_constrained = df.copy() + truncation_report = {} - -# ============================================================================= -# Windows CP1252 safe logging -# ============================================================================= - -class _SafeStreamHandler(logging.StreamHandler): - def emit(self, record): - try: - super().emit(record) - except UnicodeEncodeError: - try: - msg = self.format(record) - self.stream.write( - msg.encode("utf-8", errors="replace").decode("ascii", errors="replace") - + self.terminator - ) - self.flush() - except Exception: - self.handleError(record) - - -# ============================================================================= -# HELPERS -# ============================================================================= - -def _should_invert(direction: str, logger=None, context: str = "") -> bool: - d = str(direction).lower().strip() - if d in DIRECTION_INVERT_KEYWORDS: - return True - if d in DIRECTION_POSITIVE_KEYWORDS: - return False - if logger: - logger.warning( - f" [DIRECTION WARNING] Unknown direction '{direction}' " - f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)." + for column, max_length in COLUMN_CONSTRAINTS.items(): + if column not in df_constrained.columns: + continue + mask = ( + df_constrained[column].notna() & + (df_constrained[column].astype(str).str.len() > max_length) ) - return False + truncated_count = mask.sum() + if truncated_count > 0: + truncation_report[column] = { + 'count' : int(truncated_count), + 'max_length': max_length, + 'examples' : df_constrained[mask][column].head(3).tolist() + } + df_constrained[column] = df_constrained[column].apply( + lambda x: truncate_string(x, max_length) + ) + + if truncation_report: + print("\n Column Truncations Applied:") + for column, info in truncation_report.items(): + print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars") + else: + print("\n No truncations needed — all values within constraints") + + return df_constrained -def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series: +# ============================================================================= +# COUNTRY NAME STANDARDIZATION +# ============================================================================= + +ASEAN_MAPPING = { + 'BRN' : 'Brunei Darussalam', + 'BRUNEI' : 'Brunei Darussalam', + 'BRUNEI DARUSSALAM' : 'Brunei Darussalam', + 'KHM' : 'Cambodia', + 'CAMBODIA' : 'Cambodia', + 'IDN' : 'Indonesia', + 'INDONESIA' : 'Indonesia', + 'LAO' : 'Laos', + 'LAOS' : 'Laos', + "LAO PEOPLE'S DEMOCRATIC REPUBLIC" : 'Laos', + 'LAO PDR' : 'Laos', + 'MYS' : 'Malaysia', + 'MALAYSIA' : 'Malaysia', + 'MMR' : 'Myanmar', + 'MYANMAR' : 'Myanmar', + 'BURMA' : 'Myanmar', + 'PHL' : 'Philippines', + 'PHILIPPINES' : 'Philippines', + 'SGP' : 'Singapore', + 'SINGAPORE' : 'Singapore', + 'THA' : 'Thailand', + 'THAILAND' : 'Thailand', + 'VNM' : 'Vietnam', + 'VIETNAM' : 'Vietnam', + 'VIET NAM' : 'Vietnam', +} + + +def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'country') -> tuple: """ - Rescale series ke rentang [lo, hi]. - Digunakan untuk mengubah norm agregat (0-1) menjadi skor 1-100 di level - pillar / country / asean. Bukan untuk normalisasi indikator mentah. + Standardize country names untuk ASEAN. + Ensures country names within varchar(100) constraint. + + Returns: + tuple: (df_clean, report_dict) """ - values = series.dropna().values - if len(values) == 0: - return pd.Series(np.nan, index=series.index) - v_min, v_max = values.min(), values.max() - if v_min == v_max: - return pd.Series((lo + hi) / 2.0, index=series.index) - result = np.full(len(series), np.nan) - not_nan = series.notna() - raw = series[not_nan].values - result[not_nan.values] = lo + (raw - v_min) / (v_max - v_min) * (hi - lo) - return pd.Series(result, index=series.index) + df_clean = df.copy() + def map_country(country): + if pd.isna(country): + return country + s = str(country).strip() + mapped = ASEAN_MAPPING.get(s.upper(), s) + return mapped[:100] if len(mapped) > 100 else mapped -def add_yoy(df: pd.DataFrame, group_cols: list, score_col: str) -> pd.DataFrame: - df = df.sort_values(group_cols + ["year"]).reset_index(drop=True) - if group_cols: - df["year_over_year_change"] = df.groupby(group_cols)[score_col].diff() - else: - df["year_over_year_change"] = df[score_col].diff() - return df + original = df_clean[country_column].copy() + df_clean[country_column] = df_clean[country_column].apply(map_country) + changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new} - -def safe_int( - series: pd.Series, fill: int = 0, col_name: str = "", logger=None -) -> pd.Series: - n_nan = series.isna().sum() - if n_nan > 0 and logger: - logger.warning( - f" [NaN WARNING] Kolom '{col_name}' punya {n_nan} NaN -> di-fill dengan {fill}" - ) - return series.fillna(fill).astype(int) - - -def check_and_dedup( - df: pd.DataFrame, key_cols: list, context: str = "", logger=None -) -> pd.DataFrame: - dupes = df.duplicated(subset=key_cols, keep=False) - if dupes.any(): - n_dupes = dupes.sum() - if logger: - logger.warning( - f" [DEDUP WARNING] {context}: {n_dupes} duplikat rows pada {key_cols}. " - f"Di-aggregate dengan mean." - ) - numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() - agg_dict = { - c: ("mean" if c in numeric_cols else "first") - for c in df.columns if c not in key_cols - } - df = df.groupby(key_cols, as_index=False).agg(agg_dict) - return df - - -def add_condition_column(df: pd.DataFrame, score_col: str) -> pd.DataFrame: - df['condition'] = df[score_col].apply(assign_condition) - return df - - -def log_condition_summary(df: pd.DataFrame, context: str, logger) -> None: - dist = df['condition'].value_counts() - logger.info( - f" Condition distribution ({context}): " + - " | ".join(f"{c}: {n:,}" for c, n in dist.items()) - ) + return df_clean, { + 'countries_mapped': len(set(changes.keys())), + 'changes' : changes, + } # ============================================================================= -# NARRATIVE BUILDER FUNCTIONS (tidak berubah) +# PILLAR CLASSIFICATION # ============================================================================= -def _fmt_score(score) -> str: - if score is None or (isinstance(score, float) and np.isnan(score)): - return "N/A" - return f"{score:.2f}" +def assign_pillar(indicator_name: str) -> str: + """ + Assign pillar berdasarkan keyword indikator. + Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting' + All <= 20 chars (varchar(20) constraint). + """ + if pd.isna(indicator_name): + return 'Supporting' + ind = str(indicator_name).lower() + for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: + if kw in ind: + return 'Supporting' -def _fmt_delta(delta) -> str: - if delta is None or (isinstance(delta, float) and np.isnan(delta)): - return "N/A" - sign = "+" if delta >= 0 else "" - return f"{sign}{delta:.2f}" + if any(kw in ind for kw in [ + 'adequacy', 'protein supply', 'supply of protein', + 'dietary energy supply', 'share of dietary energy', 'derived from cereals' + ]): + return 'Availability' + if any(kw in ind for kw in [ + 'variability', 'cereal import dependency', 'arable land equipped', + 'political stability', 'value of food imports in total' + ]): + return 'Stability' -def _build_overview_narrative( - year, n_mdg, n_sdg, n_total_ind, score, yoy_val, yoy_pct, - prev_year, prev_score, ranking_list, - most_improved_country, most_improved_delta, - most_declined_country, most_declined_delta, -) -> str: - parts_ind = [] - if n_mdg > 0: - parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}") - if n_sdg > 0: - parts_ind.append(f"{n_sdg} SDG indicator{'s' if n_sdg > 1 else ''}") + if any(kw in ind for kw in [ + 'gdp', 'gross domestic product', 'rail lines', 'road density', + 'number of moderately', 'number of severely', + 'number of people undernourished', 'prevalence of moderate', + 'prevalence of severe', 'prevalence of undernourishment', 'food insecure' + ]): + return 'Access' - if parts_ind: - ind_detail = " and ".join(parts_ind) - sent1 = ( - f"In {year}, the ASEAN food security assessment incorporated a total of " - f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}, " - f"consisting of {ind_detail}." - ) - else: - sent1 = ( - f"In {year}, the ASEAN food security assessment incorporated " - f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}." - ) + if any(kw in ind for kw in [ + 'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity', + 'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water', + 'sanitation', 'children under 5', 'newborns with low', + 'women of reproductive' + ]): + return 'Utilization' - if yoy_val is not None and prev_score is not None: - direction_word = "increasing" if yoy_val >= 0 else "decreasing" - pct_clause = "" - if yoy_pct is not None: - abs_pct = abs(yoy_pct) - trend_word = "improvement" if yoy_val >= 0 else "decline" - pct_clause = f", which represents a {abs_pct:.2f}% {trend_word} year-over-year" - sent2 = ( - f"The ASEAN overall score (Total framework) reached {_fmt_score(score)}, " - f"{direction_word} by {abs(yoy_val):.2f} points compared to the previous year " - f"({_fmt_score(prev_score)} in {prev_year}){pct_clause}." - ) - else: - sent2 = ( - f"The ASEAN overall score (Total framework) reached {_fmt_score(score)} in {year}; " - f"no prior-year data is available for year-over-year comparison." - ) - - sent3 = "" - if ranking_list: - first = ranking_list[0] - last = ranking_list[-1] - middle = ranking_list[1:-1] - if len(ranking_list) == 1: - sent3 = ( - f"In terms of country performance, {first['country_name']} was the only " - f"country assessed, scoring {_fmt_score(first['score'])} in {year}." - ) - elif len(ranking_list) == 2: - sent3 = ( - f"In terms of country performance, {first['country_name']} led the region " - f"with a score of {_fmt_score(first['score'])}, while " - f"{last['country_name']} recorded the lowest score of " - f"{_fmt_score(last['score'])} in {year}." - ) - else: - middle_parts = [f"{c['country_name']} ({_fmt_score(c['score'])})" for c in middle] - middle_str = ( - middle_parts[0] if len(middle_parts) == 1 - else ", ".join(middle_parts[:-1]) + f", and {middle_parts[-1]}" - ) - sent3 = ( - f"In terms of country performance, {first['country_name']} led the region " - f"with a score of {_fmt_score(first['score'])}, followed by {middle_str}. " - f"At the other end, {last['country_name']} recorded the lowest score " - f"of {_fmt_score(last['score'])} in {year}." - ) - - sent4_parts = [] - if most_improved_country and most_improved_delta is not None: - sent4_parts.append( - f"the most notable improvement was seen in {most_improved_country}, " - f"which gained {_fmt_delta(most_improved_delta)} points from the previous year" - ) - if most_declined_country and most_declined_delta is not None: - if most_declined_delta < 0: - sent4_parts.append( - f"while {most_declined_country} experienced the largest decline " - f"of {_fmt_delta(most_declined_delta)} points" - ) - else: - sent4_parts.append( - f"while {most_declined_country} recorded the smallest gain " - f"of {_fmt_delta(most_declined_delta)} points" - ) - - sent4 = "" - if sent4_parts: - sent4 = ", ".join(sent4_parts) + "." - sent4 = sent4[0].upper() + sent4[1:] - - return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) - - -def _build_pillar_narrative( - year, pillar_name, pillar_score, rank_in_year, n_pillars, yoy_val, - top_country, top_country_score, bot_country, bot_country_score, - strongest_pillar, strongest_score, weakest_pillar, weakest_score, - most_improved_pillar, most_improved_delta, - most_declined_pillar, most_declined_delta, -) -> str: - rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") - sent1 = ( - f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, " - f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN." - ) - - sent2 = "" - if strongest_pillar and weakest_pillar: - if strongest_pillar == pillar_name: - sent2 = ( - f"This made {pillar_name} the strongest performing pillar in {year}, " - f"compared to the weakest pillar, {weakest_pillar}, " - f"which scored {_fmt_score(weakest_score)}." - ) - elif weakest_pillar == pillar_name: - sent2 = ( - f"This made {pillar_name} the weakest performing pillar in {year}, " - f"compared to the strongest pillar, {strongest_pillar}, " - f"which scored {_fmt_score(strongest_score)}." - ) - else: - sent2 = ( - f"Across all pillars in {year}, {strongest_pillar} was the strongest " - f"(score: {_fmt_score(strongest_score)}), while {weakest_pillar} " - f"was the weakest (score: {_fmt_score(weakest_score)})." - ) - - sent3 = "" - if top_country and bot_country: - if top_country != bot_country: - sent3 = ( - f"Within the {pillar_name} pillar, {top_country} led with a score of " - f"{_fmt_score(top_country_score)}, while {bot_country} recorded the lowest " - f"score of {_fmt_score(bot_country_score)}." - ) - else: - sent3 = ( - f"Within the {pillar_name} pillar, {top_country} was the only country " - f"with available data, scoring {_fmt_score(top_country_score)}." - ) - - if yoy_val is not None: - direction_word = "improved" if yoy_val >= 0 else "declined" - sent4 = ( - f"Compared to the previous year, the {pillar_name} pillar " - f"{direction_word} by {abs(yoy_val):.2f} points" - ) - else: - sent4 = ( - f"No prior-year data is available to calculate year-over-year change " - f"for the {pillar_name} pillar in {year}" - ) - - if (most_improved_pillar and most_improved_delta is not None - and most_declined_pillar and most_declined_delta is not None - and most_improved_pillar != most_declined_pillar): - sent4 += ( - f". Across all pillars, {most_improved_pillar} showed the greatest improvement " - f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} " - f"recorded the largest decline ({_fmt_delta(most_declined_delta)} pts)" - ) - - sent4 += "." - sent4 = sent4[0].upper() + sent4[1:] - - return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) + return 'Supporting' # ============================================================================= -# MAIN CLASS +# DIRECTION CLASSIFICATION # ============================================================================= -class FoodSecurityAggregator: +def assign_direction(indicator_name: str) -> str: + """ + Assign direction berdasarkan indikator. + Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars) + Both <= 15 chars (varchar(15) constraint). + """ + if pd.isna(indicator_name): + return 'higher_better' + ind = str(indicator_name).lower() - def __init__(self, client: bigquery.Client): - self.client = client - self.logger = logging.getLogger(self.__class__.__name__) + if 'share of dietary energy supply derived from cereals' in ind: + return 'lower_better' + + for kw in [ + 'exclusive breastfeeding', + 'dietary energy supply', + 'dietary energy supply adequacy', + 'average fat supply', + 'average protein supply', + 'supply of protein of animal origin', + ]: + if kw in ind: + return 'higher_better' + + for kw in [ + 'prevalence of undernourishment', + 'prevalence of severe food insecurity', + 'prevalence of moderate or severe food insecurity', + 'prevalence of moderate food insecurity', + 'prevalence of wasting', + 'prevalence of stunting', + 'prevalence of overweight', + 'prevalence of obesity', + 'prevalence of anemia', + 'prevalence of anaemia', + 'prevalence of low birthweight', + 'number of people undernourished', + 'number of severely food insecure', + 'number of moderately or severely food insecure', + 'number of children under 5 years affected by wasting', + 'number of children under 5 years of age who are overweight', + 'number of children under 5 years of age who are stunted', + 'number of newborns with low birthweight', + 'number of obese adults', + 'number of women of reproductive age', + 'percentage of children under 5 years affected by wasting', + 'percentage of children under 5 years of age who are overweight', + 'percentage of children under 5 years of age who are stunted', + 'cereal import dependency', + 'import dependency', + 'value of food imports in total merchandise exports', + 'value of food imports', + 'variability of food production', + 'variability of food supply', + 'per capita food production variability', + 'per capita food supply variability', + 'coefficient of variation', + 'incidence of caloric losses', + 'food losses', + 'indicator of food price anomalies', + 'proportion of local breeds classified as being at risk', + 'agricultural export subsidies', + ]: + if kw in ind: + return 'lower_better' + + return 'higher_better' + + +# ============================================================================= +# CLEANED DATA LOADER +# ============================================================================= + +class CleanedDataLoader: + """ + Loader untuk cleaned integrated data ke STAGING layer (Silver). + + Kimball context: + Input : staging_integrated -> STAGING (Silver) — fs_asean_silver + Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver + Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit + + Pipeline steps: + 1. Standardize country names (ASEAN) + 2. Remove missing values + 3. Remove duplicates + 4. Add pillar & direction classification + 5. Apply column constraints + 6. Load ke BigQuery + 7. Log ke Audit layer + """ + + SCHEMA = [ + bigquery.SchemaField("source", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"), + bigquery.SchemaField("country", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"), + bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + ] + + def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'): + self.client = client + self.load_mode = load_mode + self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False + self.table_name = 'cleaned_integrated' + self.target_layer = 'silver' - self.load_metadata = { - "agg_pillar_composite": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, - "agg_pillar_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, - "agg_framework_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, - "agg_framework_asean": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, - "agg_narrative_overview": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, - "agg_narrative_pillar": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + self.metadata = { + 'source_class' : self.__class__.__name__, + 'table_name' : self.table_name, + 'start_time' : None, + 'end_time' : None, + 'duration_seconds' : None, + 'rows_fetched' : 0, + 'rows_transformed' : 0, + 'rows_loaded' : 0, + 'load_mode' : load_mode, + 'validation_metrics': {} } - self.df = None - self.dims = {} + # ------------------------------------------------------------------ + # STEP METHODS + # ------------------------------------------------------------------ - self.sdgs_start_year = None - self.mdgs_indicator_ids = set() - self.sdgs_indicator_ids = set() + def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 1/5] Standardize country names...") + df, report = standardize_country_names_asean(df, country_column='country') + print(f" ASEAN countries mapped : {report['countries_mapped']}") + unique_countries = sorted(df['country'].unique()) + print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}") + log_update(self.client, 'STAGING', 'staging_integrated', + 'standardize_asean', report['countries_mapped']) + return df - # ========================================================================= - # STEP 1: Load data - # ========================================================================= + def _step_remove_missing(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 2/5] Remove missing values...") + rows_before = len(df) + df_clean = df.dropna(subset=list(df.columns)) + rows_after = len(df_clean) + removed = rows_before - rows_after + print(f" Rows before : {rows_before:,}") + print(f" Rows after : {rows_after:,}") + print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)") + print(f" Retention : {rows_after/rows_before*100:.1f}%") + return df_clean - def load_data(self): - self.logger.info("=" * 70) - self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") - self.logger.info("=" * 70) - - self.df = read_from_bigquery( - self.client, "fact_asean_food_security_selected", layer='gold' + def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 3/5] Remove duplicates...") + exact_dups = df.duplicated().sum() + data_dups = df.duplicated( + subset=['indicator_standardized', 'country', 'year', 'value'] + ).sum() + print(f" Exact duplicates : {exact_dups:,}") + print(f" Data duplicates : {data_dups:,}") + rows_before = len(df) + df_clean = df.drop_duplicates( + subset=['indicator_standardized', 'country', 'year'], keep='first' ) - self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows") + removed = rows_before - len(df_clean) + print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)") + return df_clean - required_cols = { - "country_id", "country_name", - "indicator_id", "indicator_name", "direction", "framework", - "pillar_id", "pillar_name", - "time_id", "year", "value", - # PERBAIKAN: norm_value_1_100 wajib ada (hasil analytical_layer) - "norm_value_1_100", + def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 4/5] Add pillar & direction classification...") + df = df.copy() + + df['pillar'] = df['indicator_standardized'].apply(assign_pillar) + df['direction'] = df['indicator_standardized'].apply(assign_direction) + + pillar_counts = df['pillar'].value_counts() + print(f" Pillar distribution:") + for pillar, count in pillar_counts.items(): + print(f" - {pillar}: {count:,}") + + direction_counts = df['direction'].value_counts() + print(f" Direction distribution:") + for direction, count in direction_counts.items(): + pct = count / len(df) * 100 + print(f" - {direction}: {count:,} ({pct:.1f}%)") + + return df + + def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 5/5] Apply column constraints...") + return apply_column_constraints(df) + + # ------------------------------------------------------------------ + # VALIDATION + # ------------------------------------------------------------------ + + def validate_data(self, df: pd.DataFrame) -> Dict: + validation = { + 'total_rows' : int(len(df)), + 'total_columns' : int(len(df.columns)), + 'duplicate_count' : int(df.duplicated().sum()), + 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), + 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) } - missing_cols = required_cols - set(self.df.columns) - if missing_cols: - raise ValueError( - f"Kolom berikut tidak ditemukan: {missing_cols}\n" - f"Pastikan pipeline dijalankan berurutan:\n" - f" 1. bigquery_cleaned_layer.py\n" - f" 2. bigquery_dimensional_model.py\n" - f" 3. bigquery_analytical_layer.py ← harus dijalankan dulu\n" - f" 4. bigquery_analysis_layer.py (file ini)" - ) - - self.df["direction"] = self.df["direction"].fillna("positive") - self.df["framework"] = self.df["framework"].fillna("MDGs") - self.df["norm_value_1_100"] = self.df["norm_value_1_100"].astype(float) - - dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() - self.logger.info(f"\n Distribusi direction per indikator:") - for d, cnt in dir_dist.items(): - tag = "INVERT" if _should_invert(d, self.logger, "load_data") else "normal" - self.logger.info(f" {d:<25} : {cnt:>3} [{tag}]") - - fw_dist = self.df.drop_duplicates("indicator_id")["framework"].value_counts() - self.logger.info(f"\n Distribusi framework per indikator:") - for fw, cnt in fw_dist.items(): - self.logger.info(f" {fw:<10} : {cnt:>3}") - - self.logger.info( - f"\n Rows: {len(self.df):,} | Negara: {self.df['country_id'].nunique()} | " - f"Indikator: {self.df['indicator_id'].nunique()} | " - f"Tahun: {int(self.df['year'].min())}-{int(self.df['year'].max())}" - ) - - # Diagnostik: cek komparabilitas norm antar framework - self._log_norm_comparability_diagnostics() - - def _log_norm_comparability_diagnostics(self): - """ - Log diagnostik untuk memverifikasi bahwa norm_value_1_100 sudah comparable - antar framework setelah perbaikan di analytical_layer. - """ - self.logger.info(f"\n [DIAGNOSTIK] Komparabilitas norm_value_1_100 antar framework:") - self.logger.info(f" {'─'*60}") - - fw_stats = ( - self.df.groupby('framework')['norm_value_1_100'] - .agg(['mean', 'median', 'std', 'min', 'max']) - .round(2) - ) - for fw, row in fw_stats.iterrows(): - self.logger.info( - f" {fw:<8} mean={row['mean']:>6.2f} median={row['median']:>6.2f} " - f"std={row['std']:>5.2f} range=[{row['min']:.2f},{row['max']:.2f}]" - ) - - mdgs_mean = self.df[self.df['framework'] == 'MDGs']['norm_value_1_100'].mean() - sdgs_mean = self.df[self.df['framework'] == 'SDGs']['norm_value_1_100'].mean() - gap = mdgs_mean - sdgs_mean - - if abs(gap) > 15: - self.logger.info( - f"\n [INFO] Gap MDGs-SDGs = {gap:.2f} poin." - f"\n Ini adalah perbedaan SUBSTANTIF (bukan artefak normalisasi):" - f"\n Indikator SDGs mengukur deprivasi yang lebih dalam" - f"\n (FIES, stunting, wasting, anaemia) vs indikator MDGs." - f"\n Gap ini valid untuk dilaporkan sebagai temuan analisis." - ) - else: - self.logger.info( - f"\n [OK] Gap MDGs-SDGs = {gap:.2f} poin — dalam batas wajar." - ) - - # ========================================================================= - # STEP 1b: Klasifikasi indikator - # ========================================================================= - - def _classify_indicators(self): - self.logger.info("\n" + "=" * 70) - self.logger.info("STEP 1b: KLASIFIKASI INDIKATOR -> MDGs / SDGs") - self.logger.info("=" * 70) - - self.mdgs_indicator_ids = set( - self.df[self.df["framework"] == "MDGs"]["indicator_id"].unique().tolist() - ) - self.sdgs_indicator_ids = set( - self.df[self.df["framework"] == "SDGs"]["indicator_id"].unique().tolist() - ) - - _PROXY_KW = frozenset(['food insecurity', 'anemia', 'anaemia']) - proxy_mask = ( - (self.df["framework"] == "SDGs") & - self.df["indicator_name"].str.lower().apply( - lambda n: any(kw in n for kw in _PROXY_KW) - ) - ) - df_proxy = self.df[proxy_mask] - - if not df_proxy.empty: - self.sdgs_start_year = int(df_proxy["year"].min()) - self.logger.info( - f"\n sdgs_start_year = {self.sdgs_start_year} " - f"(dari proxy FIES/anaemia di tabel)" - ) - else: - sdgs_rows = self.df[self.df["framework"] == "SDGs"] - if not sdgs_rows.empty: - self.sdgs_start_year = int(sdgs_rows["year"].min()) - self.logger.warning( - f" [WARN] Proxy tidak ditemukan, fallback ke min(year) SDGs: " - f"{self.sdgs_start_year}" - ) - else: - self.sdgs_start_year = int(self.df["year"].max()) + 1 - self.logger.warning( - f" [WARN] Tidak ada SDGs. sdgs_start_year = {self.sdgs_start_year}" - ) - - self.logger.info(f" MDGs : {len(self.mdgs_indicator_ids)} indikator") - self.logger.info(f" SDGs : {len(self.sdgs_indicator_ids)} indikator") - - for fw in ["MDGs", "SDGs"]: - fw_inds = ( - self.df[self.df["framework"] == fw] - .drop_duplicates("indicator_id")[["indicator_id", "indicator_name"]] - .sort_values("indicator_name") - ) - self.logger.info(f"\n {fw} indicators ({len(fw_inds)}):") - for _, row in fw_inds.iterrows(): - self.logger.info(f" [{int(row['indicator_id'])}] {row['indicator_name']}") - - # ========================================================================= - # CORE HELPER: _get_norm_value_df() - # ========================================================================= - # PERBAIKAN: - # Fungsi ini TIDAK lagi melakukan normalisasi ulang per indikator. - # Kolom norm_value_1_100 sudah dihitung sekali di analytical_layer - # dengan referensi global (semua tahun, semua negara, per indikator). - # - # Yang dilakukan di sini hanya: - # 1. Membaca norm_value_1_100 dari df - # 2. Mengubah skala 1-100 → 0-1 (untuk keperluan rata-rata agregat) - # dengan rumus linear: norm_0_1 = (norm_1_100 - 1) / 99 - # - # Rescaling agregat (0-1 → 1-100) tetap dilakukan via global_minmax() - # di masing-masing fungsi calc_* untuk menghasilkan skor level pillar/country/asean. - # ========================================================================= - - def _get_norm_value_df(self) -> pd.DataFrame: - """ - Mengembalikan df dengan kolom 'norm_value' (skala 0-1) yang diturunkan - dari norm_value_1_100 (sudah ada di source, dihitung di analytical_layer). - - Transformasi: norm_value = (norm_value_1_100 - 1) / 99 - Ini adalah transformasi LINEAR — tidak mengubah urutan relatif antar indikator, - negara, atau tahun. Komparabilitas lintas framework tetap terjaga. - """ - df = self.df.copy() - - # Konversi 1-100 → 0-1 secara linear - df["norm_value"] = np.where( - df["norm_value_1_100"].notna(), - (df["norm_value_1_100"] - 1.0) / 99.0, - np.nan - ) - - n_null = df["norm_value"].isna().sum() - n_valid = df["norm_value"].notna().sum() - self.logger.debug( - f" _get_norm_value_df: {n_valid:,} valid | {n_null:,} null " - f"(dari norm_value_1_100 analytical_layer)" - ) - - return df - - # ========================================================================= - # STEP 2: agg_pillar_composite - # ========================================================================= - - def calc_pillar_composite(self) -> pd.DataFrame: - table_name = "agg_pillar_composite" - self.load_metadata[table_name]["start_time"] = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 2: {table_name}") - self.logger.info("=" * 70) - - df_normed = self._get_norm_value_df() - - df = ( - df_normed - .groupby(["pillar_id", "pillar_name", "year"]) - .agg( - pillar_norm =("norm_value", "mean"), - n_indicators =("indicator_id", "nunique"), - n_countries =("country_id", "nunique"), - ) - .reset_index() - ) - - df["pillar_score_1_100"] = global_minmax(df["pillar_norm"]) - df["rank_in_year"] = ( - df.groupby("year")["pillar_score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") - df = add_condition_column(df, "pillar_score_1_100") - log_condition_summary(df, table_name, self.logger) - - df["pillar_id"] = df["pillar_id"].astype(int) - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["n_countries"] = safe_int(df["n_countries"], col_name="n_countries", logger=self.logger) - df["rank_in_year"] = df["rank_in_year"].astype(int) - df["pillar_norm"] = df["pillar_norm"].astype(float) - df["pillar_score_1_100"] = df["pillar_score_1_100"].astype(float) - - schema = [ - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df - - # ========================================================================= - # STEP 3: agg_pillar_by_country - # ========================================================================= - - def calc_pillar_by_country(self) -> pd.DataFrame: - table_name = "agg_pillar_by_country" - self.load_metadata[table_name]["start_time"] = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 3: {table_name}") - self.logger.info("=" * 70) - - df_normed = self._get_norm_value_df() - - df = ( - df_normed - .groupby(["country_id", "country_name", "pillar_id", "pillar_name", "year"]) - .agg(pillar_country_norm=("norm_value", "mean")) - .reset_index() - ) - - df["pillar_country_score_1_100"] = global_minmax(df["pillar_country_norm"]) - df["rank_in_pillar_year"] = ( - df.groupby(["pillar_id", "year"])["pillar_country_score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") - df = add_condition_column(df, "pillar_country_score_1_100") - log_condition_summary(df, table_name, self.logger) - - df["country_id"] = df["country_id"].astype(int) - df["pillar_id"] = df["pillar_id"].astype(int) - df["year"] = df["year"].astype(int) - df["rank_in_pillar_year"] = df["rank_in_pillar_year"].astype(int) - df["pillar_country_norm"] = df["pillar_country_norm"].astype(float) - df["pillar_country_score_1_100"] = df["pillar_country_score_1_100"].astype(float) - - schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_country_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df - - # ========================================================================= - # STEP 4: agg_framework_by_country - # ========================================================================= - # PERBAIKAN: - # - Flag NORMALIZE_FRAMEWORKS_JOINTLY dihapus. - # - Tidak ada lagi rescaling ulang per-framework di sini. - # - Semua framework (Total, MDGs, SDGs) menggunakan norm_value yang SAMA - # sebagai basis (sudah comparable dari analytical_layer). - # - global_minmax() hanya digunakan SEKALI untuk mengubah norm agregat - # (rata-rata norm_value per country-framework-year) menjadi skor 1-100 - # di level country-framework, menggunakan SATU POOL DATA BERSAMA. - # - Dengan ini, perbandingan skor MDGs vs SDGs per negara adalah valid. - # ========================================================================= - - def _calc_country_composite_inmemory(self) -> pd.DataFrame: - df_normed = self._get_norm_value_df() - df = ( - df_normed - .groupby(["country_id", "country_name", "year"]) - .agg( - composite_score=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique"), - ) - .reset_index() - ) - df["score_1_100"] = global_minmax(df["composite_score"]) - df["rank_in_asean"] = ( - df.groupby("year")["score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["country_id"], "score_1_100") - df["country_id"] = df["country_id"].astype(int) - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["composite_score"] = df["composite_score"].astype(float) - df["score_1_100"] = df["score_1_100"].astype(float) - df["rank_in_asean"] = df["rank_in_asean"].astype(int) - return df - - def calc_framework_by_country(self) -> pd.DataFrame: - table_name = "agg_framework_by_country" - self.load_metadata[table_name]["start_time"] = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 4: {table_name}") - self.logger.info("=" * 70) - self.logger.info( - " [PERBAIKAN] Semua framework di-aggregate dari norm_value yang SAMA." - "\n Tidak ada rescaling per-framework. Skor MDGs dan SDGs comparable." - ) - - country_composite = self._calc_country_composite_inmemory() - df_normed = self._get_norm_value_df() - parts = [] - - # ── Layer TOTAL ─────────────────────────────────────────────────────── - agg_total = ( - country_composite[[ - "country_id", "country_name", "year", - "score_1_100", "n_indicators", "composite_score" - ]] - .copy() - .rename(columns={ - "score_1_100" : "framework_score_1_100", - "composite_score": "framework_norm" - }) - ) - agg_total["framework"] = "Total" - parts.append(agg_total) - - # ── Layer MDGs pre-SDGs (tahun sebelum sdgs_start_year) ────────────── - pre_sdgs_rows = country_composite[ - country_composite["year"] < self.sdgs_start_year - ].copy() - if not pre_sdgs_rows.empty: - mdgs_pre = ( - pre_sdgs_rows[[ - "country_id", "country_name", "year", - "score_1_100", "n_indicators", "composite_score" - ]] - .copy() - .rename(columns={ - "score_1_100" : "framework_score_1_100", - "composite_score": "framework_norm" - }) - ) - mdgs_pre["framework"] = "MDGs" - parts.append(mdgs_pre) - - # ── Layer MDGs mixed (setelah SDGs masuk, hanya indikator MDGs) ────── - if self.mdgs_indicator_ids: - df_mdgs_mixed = df_normed[ - (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_mdgs_mixed.empty: - agg_mdgs_mixed = ( - df_mdgs_mixed - .groupby(["country_id", "country_name", "year"]) - .agg( - framework_norm=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique") - ) - .reset_index() - ) - # PERBAIKAN: rescale dari POOL GABUNGAN bersama SDGs (lihat bawah) - agg_mdgs_mixed["framework"] = "MDGs" - parts.append(agg_mdgs_mixed) - - # ── Layer SDGs (hanya indikator SDGs, mulai sdgs_start_year) ───────── - if self.sdgs_indicator_ids: - df_sdgs = df_normed[ - (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_sdgs.empty: - agg_sdgs = ( - df_sdgs - .groupby(["country_id", "country_name", "year"]) - .agg( - framework_norm=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique") - ) - .reset_index() - ) - agg_sdgs["framework"] = "SDGs" - parts.append(agg_sdgs) - - df = pd.concat(parts, ignore_index=True) - - # PERBAIKAN: Rescale framework_score_1_100 dari SATU POOL BERSAMA - # untuk semua framework (MDGs mixed + SDGs) sekaligus. - # Ini memastikan skor 60 di MDGs dan skor 60 di SDGs memiliki makna - # yang sama: posisi relatif yang sama dalam distribusi gabungan. - mixed_mask = df["framework"].isin(["MDGs", "SDGs"]) - mixed_pre_mask = (df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year) - - # Rescale pre-SDGs MDGs dari pool Total (sudah dihitung) - # → sudah ada di agg_total (framework_score_1_100 = dari country_composite) - - # Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA - post_sdgs_mask = mixed_mask & ~mixed_pre_mask & df["framework_norm"].notna() - if post_sdgs_mask.any(): - df.loc[post_sdgs_mask, "framework_score_1_100"] = global_minmax( - df.loc[post_sdgs_mask, "framework_norm"] - ) - - df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) - - # Pastikan kolom framework_score_1_100 ada untuk semua baris - if "framework_score_1_100" not in df.columns: - df["framework_score_1_100"] = np.nan - - df["rank_in_framework_year"] = ( - df.groupby(["framework", "year"])["framework_score_1_100"] - .rank(method="min", ascending=False) - .astype(int) - ) - df = add_yoy(df, ["country_id", "framework"], "framework_score_1_100") - df = add_condition_column(df, "framework_score_1_100") - log_condition_summary(df, table_name, self.logger) - - # Log diagnostik: bandingkan skor MDGs vs SDGs - self._log_framework_score_diagnostics(df, table_name) - - df["country_id"] = df["country_id"].astype(int) - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["rank_in_framework_year"] = safe_int(df["rank_in_framework_year"], col_name="rank_in_framework_year", logger=self.logger) - df["framework_norm"] = df["framework_norm"].astype(float) - df["framework_score_1_100"] = df["framework_score_1_100"].astype(float) - - self._validate_mdgs_equals_total(df, level="country") - - schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df - - # ========================================================================= - # STEP 5: agg_framework_asean - # ========================================================================= - # PERBAIKAN: Sama dengan framework_by_country — tidak ada rescaling terpisah - # per framework. MDGs mixed dan SDGs di-rescale dari satu pool bersama. - # ========================================================================= - - def calc_framework_asean(self) -> pd.DataFrame: - table_name = "agg_framework_asean" - self.load_metadata[table_name]["start_time"] = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 5: {table_name}") - self.logger.info("=" * 70) - self.logger.info( - " [PERBAIKAN] MDGs mixed + SDGs di-rescale dari SATU POOL BERSAMA." - "\n Skor ASEAN MDGs dan SDGs sekarang comparable." - ) - - df_normed = self._get_norm_value_df() - country_composite = self._calc_country_composite_inmemory() - - country_norm = ( - df_normed - .groupby(["country_id", "country_name", "year"])["norm_value"] - .mean().reset_index() - .rename(columns={"norm_value": "country_norm"}) - ) - asean_overall = ( - country_norm.groupby("year") - .agg( - asean_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries =("country_norm", "count") - ) - .reset_index() - ) - asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) - - parts = [] - - # ── Layer TOTAL ─────────────────────────────────────────────────────── - total_cols = asean_overall[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() - total_cols = total_cols.rename(columns={ - "asean_score_1_100": "framework_score_1_100", - "asean_norm" : "framework_norm", - "n_countries" : "n_countries_with_data", - }) - n_ind_total = df_normed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) - total_cols = total_cols.merge(n_ind_total, on="year", how="left") - total_cols["framework"] = "Total" - parts.append(total_cols) - - # ── Layer MDGs pre-SDGs ─────────────────────────────────────────────── - pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() - if not pre_sdgs.empty: - mdgs_pre = pre_sdgs[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() - mdgs_pre = mdgs_pre.rename(columns={ - "asean_score_1_100": "framework_score_1_100", - "asean_norm" : "framework_norm", - "n_countries" : "n_countries_with_data", - }) - n_ind_pre = ( - df_normed[df_normed["year"] < self.sdgs_start_year] - .groupby("year")["indicator_id"].nunique() - .reset_index().rename(columns={"indicator_id": "n_indicators"}) - ) - mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") - mdgs_pre["framework"] = "MDGs" - parts.append(mdgs_pre) - - # ── Siapkan MDGs mixed dan SDGs untuk rescaling BERSAMA ─────────────── - mixed_parts = [] - - if self.mdgs_indicator_ids: - df_mdgs_mixed = df_normed[ - (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_mdgs_mixed.empty: - cn = ( - df_mdgs_mixed.groupby(["country_id", "year"])["norm_value"].mean() - .reset_index().rename(columns={"norm_value": "country_norm"}) - ) - asean_mdgs = cn.groupby("year").agg( - framework_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries_with_data =("country_id", "count"), - ).reset_index() - n_ind_mdgs = df_mdgs_mixed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) - asean_mdgs = asean_mdgs.merge(n_ind_mdgs, on="year", how="left") - asean_mdgs["framework"] = "MDGs" - mixed_parts.append(asean_mdgs) - - if self.sdgs_indicator_ids: - df_sdgs = df_normed[ - (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & - (df_normed["year"] >= self.sdgs_start_year) - ].copy() - if not df_sdgs.empty: - cn = ( - df_sdgs.groupby(["country_id", "year"])["norm_value"].mean() - .reset_index().rename(columns={"norm_value": "country_norm"}) - ) - asean_sdgs = cn.groupby("year").agg( - framework_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries_with_data =("country_id", "count"), - ).reset_index() - n_ind_sdgs = df_sdgs.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) - asean_sdgs = asean_sdgs.merge(n_ind_sdgs, on="year", how="left") - asean_sdgs["framework"] = "SDGs" - mixed_parts.append(asean_sdgs) - - # PERBAIKAN: Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA - if mixed_parts: - df_mixed = pd.concat(mixed_parts, ignore_index=True) - df_mixed["framework_score_1_100"] = global_minmax(df_mixed["framework_norm"]) - parts.append(df_mixed) - - df = pd.concat(parts, ignore_index=True) - - df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) - df = add_yoy(df, ["framework"], "framework_score_1_100") - df = add_condition_column(df, "framework_score_1_100") - log_condition_summary(df, table_name, self.logger) - - # Log diagnostik: bandingkan skor ASEAN MDGs vs SDGs - self._log_framework_score_diagnostics(df, table_name) - - df["year"] = df["year"].astype(int) - df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) - df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) - for col in ["framework_norm", "std_norm", "framework_score_1_100"]: - df[col] = df[col].astype(float) - - self._validate_mdgs_equals_total(df, level="asean") - - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_countries_with_data", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("std_norm", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), - ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._finalize(table_name, rows) - return df - - # ========================================================================= - # STEP 6 & 7: Narrative (tidak ada perubahan) - # ========================================================================= - - def calc_narrative_overview(self, df_framework_asean, df_framework_by_country): - table_name = "agg_narrative_overview" - self.load_metadata[table_name]["start_time"] = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 6: {table_name}") - self.logger.info("=" * 70) - - asean_total = df_framework_asean[df_framework_asean["framework"] == "Total"].sort_values("year").reset_index(drop=True) - score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) - country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() - ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) - records = [] - - for _, row in asean_total.iterrows(): - yr = int(row["year"]) - score = float(row["framework_score_1_100"]) - yoy = row["year_over_year_change"] - yoy_val = float(yoy) if pd.notna(yoy) else None - - yr_ind = ind_year[ind_year["year"] == yr] - n_mdg = int(yr_ind[yr_ind["framework"] == "MDGs"]["indicator_id"].nunique()) - n_sdg = int(yr_ind[yr_ind["framework"] == "SDGs"]["indicator_id"].nunique()) - n_total_ind = int(yr_ind["indicator_id"].nunique()) - prev_score = score_by_year.get(yr - 1, None) - yoy_pct = ((yoy_val / prev_score * 100) if (yoy_val is not None and prev_score and prev_score != 0) else None) - - yr_country = country_total[country_total["year"] == yr].sort_values("rank_in_framework_year").reset_index(drop=True) - ranking_list = [] - for _, cr in yr_country.iterrows(): - cr_yoy = cr.get("year_over_year_change", None) - ranking_list.append({ - "rank" : int(cr["rank_in_framework_year"]), - "country_name": str(cr["country_name"]), - "score" : round(float(cr["framework_score_1_100"]), 2), - "yoy_change" : round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, - }) - - yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) - if not yr_country_yoy.empty: - best_idx = yr_country_yoy["year_over_year_change"].idxmax() - worst_idx = yr_country_yoy["year_over_year_change"].idxmin() - most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) - most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) - most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) - most_declined_delta = round(float(yr_country_yoy.loc[worst_idx, "year_over_year_change"]), 2) - else: - most_improved_country = most_declined_country = None - most_improved_delta = most_declined_delta = None - - narrative = _build_overview_narrative( - year=yr, n_mdg=n_mdg, n_sdg=n_sdg, n_total_ind=n_total_ind, - score=score, yoy_val=yoy_val, yoy_pct=yoy_pct, - prev_year=yr-1, prev_score=prev_score, ranking_list=ranking_list, - most_improved_country=most_improved_country, most_improved_delta=most_improved_delta, - most_declined_country=most_declined_country, most_declined_delta=most_declined_delta, - ) - - records.append({ - "year" : yr, - "n_mdg_indicators" : n_mdg, - "n_sdg_indicators" : n_sdg, - "n_total_indicators" : n_total_ind, - "asean_total_score" : round(score, 2), - "yoy_change" : yoy_val, - "yoy_change_pct" : round(yoy_pct, 2) if yoy_pct is not None else None, - "country_ranking_json" : json.dumps(ranking_list, ensure_ascii=False), - "most_improved_country": most_improved_country, - "most_improved_delta" : most_improved_delta, - "most_declined_country": most_declined_country, - "most_declined_delta" : most_declined_delta, - "narrative_overview" : narrative, - }) - - df = pd.DataFrame(records) - df["year"] = df["year"].astype(int) - df["n_mdg_indicators"] = df["n_mdg_indicators"].astype(int) - df["n_sdg_indicators"] = df["n_sdg_indicators"].astype(int) - df["n_total_indicators"] = df["n_total_indicators"].astype(int) - df["asean_total_score"] = df["asean_total_score"].astype(float) - for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]: - df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) - - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_sdg_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("n_total_indicators", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("asean_total_score", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_change_pct", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("country_ranking_json", "STRING", mode="REQUIRED"), - bigquery.SchemaField("most_improved_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), - ] - rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) - self._finalize(table_name, rows) - return df - - def calc_narrative_pillar(self, df_pillar_composite, df_pillar_by_country): - table_name = "agg_narrative_pillar" - self.load_metadata[table_name]["start_time"] = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 7: {table_name}") - self.logger.info("=" * 70) - - records = [] - for yr in sorted(df_pillar_composite["year"].unique()): - yr_pillars = df_pillar_composite[df_pillar_composite["year"] == yr].sort_values("rank_in_year").reset_index(drop=True) - yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] - strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None - weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None - - yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) - if not yr_pillars_yoy.empty: - best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() - worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() - most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) - most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) - most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) - most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2) - else: - most_improved_pillar = most_declined_pillar = None - most_improved_delta = most_declined_delta = None - - for _, prow in yr_pillars.iterrows(): - p_id = int(prow["pillar_id"]) - p_country = yr_country_pillar[yr_country_pillar["pillar_id"] == p_id].sort_values("rank_in_pillar_year").reset_index(drop=True) - top_country = bot_country = None - top_country_score = bot_country_score = None - if not p_country.empty: - top_country = str(p_country.iloc[0]["country_name"]) - top_country_score = round(float(p_country.iloc[0]["pillar_country_score_1_100"]), 2) - bot_country = str(p_country.iloc[-1]["country_name"]) - bot_country_score = round(float(p_country.iloc[-1]["pillar_country_score_1_100"]), 2) - - p_yoy = prow["year_over_year_change"] - narrative = _build_pillar_narrative( - year=yr, pillar_name=str(prow["pillar_name"]), - pillar_score=float(prow["pillar_score_1_100"]), - rank_in_year=int(prow["rank_in_year"]), n_pillars=len(yr_pillars), - yoy_val=float(p_yoy) if pd.notna(p_yoy) else None, - top_country=top_country, top_country_score=top_country_score, - bot_country=bot_country, bot_country_score=bot_country_score, - strongest_pillar=str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, - strongest_score=round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, - weakest_pillar=str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, - weakest_score=round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, - most_improved_pillar=most_improved_pillar, most_improved_delta=most_improved_delta, - most_declined_pillar=most_declined_pillar, most_declined_delta=most_declined_delta, - ) - records.append({ - "year" : yr, - "pillar_id" : p_id, - "pillar_name" : str(prow["pillar_name"]), - "pillar_score" : round(float(prow["pillar_score_1_100"]), 2), - "rank_in_year" : int(prow["rank_in_year"]), - "yoy_change" : float(p_yoy) if pd.notna(p_yoy) else None, - "top_country" : top_country, - "top_country_score" : top_country_score, - "bottom_country" : bot_country, - "bottom_country_score": bot_country_score, - "narrative_pillar" : narrative, - }) - - df = pd.DataFrame(records) - df["year"] = df["year"].astype(int) - df["pillar_id"] = df["pillar_id"].astype(int) - df["rank_in_year"] = df["rank_in_year"].astype(int) - for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: - df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) - - schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_score", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("top_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"), - bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), - ] - rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) - self._finalize(table_name, rows) - return df - - # ========================================================================= - # DIAGNOSTIK & VALIDASI - # ========================================================================= - - def _log_framework_score_diagnostics(self, df: pd.DataFrame, context: str): - """ - Log perbandingan rata-rata skor per framework. - Setelah perbaikan, gap antar framework mencerminkan perbedaan substantif, - bukan artefak normalisasi. - """ - self.logger.info(f"\n [DIAGNOSTIK] Rata-rata skor per framework ({context}):") - fw_means = df.groupby("framework")["framework_score_1_100"].agg(['mean', 'min', 'max']).round(2) - for fw, row in fw_means.iterrows(): - self.logger.info( - f" {fw:<8} mean={row['mean']:>6.2f} " - f"range=[{row['min']:.2f}, {row['max']:.2f}]" - ) - - if "MDGs" in fw_means.index and "SDGs" in fw_means.index: - gap = fw_means.loc["MDGs", "mean"] - fw_means.loc["SDGs", "mean"] - self.logger.info( - f"\n Gap MDGs-SDGs = {gap:.2f} poin" - + ( - " → SUBSTANTIF (indikator SDGs mengukur deprivasi lebih dalam)" - if abs(gap) > 10 else - " → dalam batas wajar" - ) - ) - - def _validate_mdgs_equals_total(self, df: pd.DataFrame, level: str = ""): - self.logger.info(f"\n Validasi MDGs < {self.sdgs_start_year} == Total [{level}]:") - group_by = ["year"] if level.startswith("asean") else ["country_id", "year"] - mdgs_pre = df[(df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) - total_pre = df[(df["framework"] == "Total") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "total_score"}) - if mdgs_pre.empty and total_pre.empty: - self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)") - return - if mdgs_pre.empty or total_pre.empty: - self.logger.warning(f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}") - return - check = mdgs_pre.merge(total_pre, on=group_by) - max_diff = (check["mdgs_score"] - check["total_score"]).abs().max() - status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}" - self.logger.info(f" -> {status} (n_checked={len(check)})") - - def _finalize(self, table_name: str, rows_loaded: int): - self.load_metadata[table_name].update({"rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now()}) - log_update(self.client, "DW", table_name, "full_load", rows_loaded) - self.logger.info(f" {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") - - def _fail(self, table_name: str, error: Exception): - self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) - self.logger.error(f" [FAIL] {table_name}: {error}") - log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) - - # ========================================================================= + if 'year' in df.columns: + validation['year_range'] = { + 'min' : int(df['year'].min()) if not df['year'].isnull().all() else None, + 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, + 'unique_years': int(df['year'].nunique()) + } + for col in ('pillar', 'direction', 'source'): + if col in df.columns: + validation[f'{col}_breakdown'] = { + str(k): int(v) for k, v in df[col].value_counts().to_dict().items() + } + if 'indicator_standardized' in df.columns: + validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) + if 'country' in df.columns: + validation['unique_countries'] = int(df['country'].nunique()) + + column_length_check = {} + for col, max_len in COLUMN_CONSTRAINTS.items(): + if col in df.columns: + max_actual = df[col].astype(str).str.len().max() + column_length_check[col] = { + 'max_length_constraint': max_len, + 'max_actual_length' : int(max_actual), + 'within_limit' : bool(max_actual <= max_len) + } + validation['column_length_check'] = column_length_check + return validation + + # ------------------------------------------------------------------ # RUN - # ========================================================================= + # ------------------------------------------------------------------ - def run(self): - start = datetime.now() - self.logger.info("\n" + "=" * 70) - self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold") - self.logger.info(f" Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info( - " NORMALISASI: norm_value dari analytical_layer (satu referensi global)." - "\n Tidak ada rescaling per-framework. MDGs dan SDGs comparable." + def run(self, df: pd.DataFrame) -> int: + """ + Execute full cleaning pipeline -> load ke STAGING (Silver). + + Returns: + int: Rows loaded + """ + self.metadata['start_time'] = datetime.now() + self.metadata['rows_fetched'] = len(df) + + if df.empty: + print(" ERROR: DataFrame is empty, nothing to process.") + return 0 + + df = self._step_standardize_countries(df) + df = self._step_remove_missing(df) + df = self._step_remove_duplicates(df) + df = self._step_add_classifications(df) + df = self._step_apply_constraints(df) + + self.metadata['rows_transformed'] = len(df) + + validation = self.validate_data(df) + self.metadata['validation_metrics'] = validation + + all_within_limits = all( + info['within_limit'] + for info in validation.get('column_length_check', {}).values() ) - self.logger.info("=" * 70) + if not all_within_limits: + print("\n WARNING: Some columns still exceed length constraints!") + for col, info in validation['column_length_check'].items(): + if not info['within_limit']: + print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}") - self.load_data() - self._classify_indicators() + print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...") + rows_loaded = load_to_bigquery( + self.client, df, self.table_name, + layer='silver', + write_disposition="WRITE_TRUNCATE", + schema=self.SCHEMA + ) + self.metadata['rows_loaded'] = rows_loaded - df_pillar_composite = self.calc_pillar_composite() - df_pillar_by_country = self.calc_pillar_by_country() - df_framework_by_country = self.calc_framework_by_country() - df_framework_asean = self.calc_framework_asean() - self.calc_narrative_overview(df_framework_asean=df_framework_asean, df_framework_by_country=df_framework_by_country) - self.calc_narrative_pillar(df_pillar_composite=df_pillar_composite, df_pillar_by_country=df_pillar_by_country) + log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded) - duration = (datetime.now() - start).total_seconds() - total_rows = sum(m["rows_loaded"] for m in self.load_metadata.values()) + self.metadata['end_time'] = datetime.now() + self.metadata['duration_seconds'] = ( + self.metadata['end_time'] - self.metadata['start_time'] + ).total_seconds() + self.metadata['execution_timestamp'] = self.metadata['start_time'] + self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) + self.metadata['config_snapshot'] = json.dumps({'load_mode': self.load_mode}) + self.metadata['validation_metrics'] = json.dumps(validation) + save_etl_metadata(self.client, self.metadata) - self.logger.info("\n" + "=" * 70) - self.logger.info("SELESAI") - self.logger.info("=" * 70) - self.logger.info(f" Durasi : {duration:.2f}s") - self.logger.info(f" Total rows : {total_rows:,}") - for tbl, meta in self.load_metadata.items(): - icon = "OK" if meta["status"] == "success" else "FAIL" - self.logger.info(f" [{icon}] {tbl:<35} {meta['rows_loaded']:>10,}") + print(f"\n Cleaned Integration completed: {rows_loaded:,} rows") + print(f" Duration : {self.metadata['duration_seconds']:.2f}s") + print(f" Completeness : {validation['completeness_pct']:.2f}%") + if 'year_range' in validation: + yr = validation['year_range'] + if yr['min'] and yr['max']: + print(f" Year range : {yr['min']}-{yr['max']}") + print(f" Indicators : {validation.get('unique_indicators', '-')}") + print(f" Countries : {validation.get('unique_countries', '-')}") + print(f"\n Schema Validation:") + for col, info in validation.get('column_length_check', {}).items(): + status = "OK" if info['within_limit'] else "FAIL" + print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}") + print(f"\n Metadata -> [AUDIT] etl_metadata") + + return rows_loaded # ============================================================================= -# AIRFLOW & MAIN +# AIRFLOW TASK FUNCTIONS # ============================================================================= -def run_aggregation(): +def run_cleaned_integration(): + """ + Airflow task: Load cleaned_integrated dari staging_integrated. + Dipanggil oleh DAG setelah task staging_integration_to_silver selesai. + """ from scripts.bigquery_config import get_bigquery_client - client = get_bigquery_client() - agg = FoodSecurityAggregator(client) - agg.run() - total = sum(m["rows_loaded"] for m in agg.load_metadata.values()) - print(f"Aggregation completed: {total:,} total rows loaded") + client = get_bigquery_client() + df_staging = load_staging_data(client) + loader = CleanedDataLoader(client, load_mode='full_refresh') + rows = loader.run(df_staging) + print(f"Cleaned layer loaded: {rows:,} rows") +# ============================================================================= +# MAIN EXECUTION +# ============================================================================= + if __name__ == "__main__": - import io - if _sys.stdout.encoding and _sys.stdout.encoding.lower() not in ("utf-8", "utf8"): - _sys.stdout = io.TextIOWrapper(_sys.stdout.buffer, encoding="utf-8", errors="replace") - if _sys.stderr.encoding and _sys.stderr.encoding.lower() not in ("utf-8", "utf8"): - _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") + print("=" * 60) + print("BIGQUERY CLEANED LAYER ETL") + print("Kimball DW Architecture") + print(" Input : STAGING (Silver) -> staging_integrated") + print(" Output : STAGING (Silver) -> cleaned_integrated") + print(" Audit : AUDIT -> etl_logs, etl_metadata") + print("=" * 60) - print("=" * 70) - print("FOOD SECURITY AGGREGATION -> fs_asean_gold") - print(f"Condition threshold: bad<{THRESHOLD_BAD}, moderate {THRESHOLD_BAD}-{THRESHOLD_GOOD}, good>{THRESHOLD_GOOD}") - print("NORMALISASI: satu referensi global per indikator (dari analytical_layer).") - print("Tidak ada rescaling per-framework. MDGs dan SDGs comparable.") - print("=" * 70) + logger = setup_logging() + client = get_bigquery_client() + df_staging = load_staging_data(client) - logger = setup_logging() - for handler in logger.handlers: - handler.__class__ = _SafeStreamHandler + print("\n[1/1] Cleaned Integration -> STAGING (Silver)...") + loader = CleanedDataLoader(client, load_mode='full_refresh') + final_count = loader.run(df_staging) - client = get_bigquery_client() - agg = FoodSecurityAggregator(client) - agg.run() - - print("\n" + "=" * 70) - print("[OK] SELESAI") - print("=" * 70) \ No newline at end of file + print("\n" + "=" * 60) + print("[OK] CLEANED LAYER ETL COMPLETED") + print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") + print(f" AUDIT : etl_logs, etl_metadata") + print("=" * 60) \ No newline at end of file