From b54b276c639643af1f9adf4ed3957ca9814ceb51 Mon Sep 17 00:00:00 2001 From: Debby Date: Thu, 2 Apr 2026 17:34:33 +0700 Subject: [PATCH] done 1 --- scripts/bigquery_analytical_layer.py | 190 +-- scripts/bigquery_cleaned_layer.py | 1824 +++++++++++++++++++------- 2 files changed, 1425 insertions(+), 589 deletions(-) diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index ddb9e88..9661578 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -15,10 +15,11 @@ Filtering Order: → Indikator DI SDG_ONLY_KEYWORDS + year >= SDG_TRANSITION_YEAR → 'SDGs' → Indikator DI SDG_ONLY_KEYWORDS + year < SDG_TRANSITION_YEAR → 'MDGs' → SDG_TRANSITION_YEAR = 2015 (HARDCODE — tanggal resmi SDGs berlaku) - BUKAN dari actual_start_year data, karena data anaemia/FIES bisa ada - sebelum 2015 namun tetap harus dilabeli MDGs pada tahun-tahun tersebut. 7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global) 8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global) + *** PERBAIKAN: normalisasi dilakukan SEKALI untuk seluruh data (semua tahun), + bukan per-framework, agar nilai dari era MDGs dan SDGs berada di + skala yang sama dan dapat dibandingkan secara adil. *** 9. Calculate YoY per indicator per country 10. Analyze indicator availability by year 11. Save analytical table @@ -26,17 +27,18 @@ Filtering Order: FRAMEWORK LOGIC: - SDG_TRANSITION_YEAR = 2015 (HARDCODE, bukan auto-detect dari data) - Semua SDG-only indicators menggunakan SDG_TRANSITION_YEAR yang SAMA - sehingga label berubah serentak di satu titik waktu - SDG-only + year < SDG_TRANSITION_YEAR → 'MDGs' (data tetap ada, tidak dihapus) - SDG-only + year >= SDG_TRANSITION_YEAR → 'SDGs' - Non-SDG-only indicators → 'MDGs' selalu (di semua tahun) -ALASAN HARDCODE: -- SDGs resmi diadopsi PBB pada 25 September 2015 dan mulai berlaku 1 Januari 2015 -- Indikator FIES dan anaemia punya data sebelum 2015 (dari MDGs era) -- Jika sdg_transition_year di-auto-detect dari min(actual_start_year), - maka akan = 2013 (karena data ada sejak 2013), sehingga semua tahun - berlabel SDGs — yang secara historis tidak tepat. +NORMALISASI (PERBAIKAN): +- norm_value_1_100 dihitung SATU KALI per indikator menggunakan seluruh data + (semua tahun, semua negara) sebagai referensi min-max. +- Ini memastikan nilai 60 di era MDGs dan nilai 60 di era SDGs memiliki + makna yang SAMA (posisi relatif yang sama dalam distribusi global). +- Tidak ada rescaling ulang per-framework di layer analitik ini. +- Rescaling per-framework (jika diperlukan untuk visualisasi) sebaiknya + dilakukan di layer agregasi (analysis_layer) dengan flag eksplisit. """ import pandas as pd @@ -65,10 +67,6 @@ from google.cloud import bigquery # ============================================================================= # SDG-ONLY INDICATOR KEYWORDS # ============================================================================= -# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini. -# Indikator di set ini → 'SDGs' mulai dari SDG_TRANSITION_YEAR (2015). -# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun. - SDG_ONLY_KEYWORDS = frozenset([ # TARGET 2.1.1 — Undernourishment "prevalence of undernourishment (percent) (3-year average)", @@ -111,23 +109,16 @@ SDG_ONLY_KEYWORDS = frozenset([ # ============================================================================= # SDG TRANSITION YEAR — HARDCODE # ============================================================================= -# SDGs resmi berlaku mulai 1 Januari 2015 (diadopsi PBB 25 September 2015). - SDG_TRANSITION_YEAR = 2015 # ============================================================================= # THRESHOLD KONDISI (fixed absolute, skala 1-100) # ============================================================================= - THRESHOLD_BAD = 40.0 THRESHOLD_GOOD = 60.0 def assign_condition(norm_value_1_100: float) -> str: - """ - Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). - Returns: 'good' / 'moderate' / 'bad' - """ if pd.isna(norm_value_1_100): return None if norm_value_1_100 > THRESHOLD_GOOD: @@ -145,20 +136,10 @@ class AnalyticalLayerLoader: """ Analytical Layer Loader for BigQuery - Output kolom fact_asean_food_security_selected: - country_id, country_name, - indicator_id, indicator_name, direction, framework, - pillar_id, pillar_name, - time_id, year, value, - norm_value_1_100, - yoy_change, yoy_pct - - FRAMEWORK LOGIC: - - SDG_TRANSITION_YEAR = 2015 (HARDCODE — tanggal resmi SDGs berlaku) - - Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun - - Indikator DI SDG_ONLY_KEYWORDS: - year < SDG_TRANSITION_YEAR (2015) → 'MDGs' (data tetap ada, tidak dihapus) - year >= SDG_TRANSITION_YEAR (2015) → 'SDGs' + PERBAIKAN NORMALISASI: + - norm_value_1_100 dihitung SEKALI per indikator dari seluruh data + (semua tahun, semua negara). Tidak ada rescaling ulang per-framework. + - Ini memastikan komparabilitas lintas era MDGs dan SDGs. """ def __init__(self, client: bigquery.Client): @@ -172,13 +153,12 @@ class AnalyticalLayerLoader: self.df_pillar = None self.selected_country_ids = None - self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5) + self.indicator_max_start_map = {} self.start_year = 2013 self.end_year = None self.baseline_year = 2023 - # SDG_TRANSITION_YEAR diambil dari konstanta modul (HARDCODE = 2015) self.sdg_transition_year = SDG_TRANSITION_YEAR self.pipeline_metadata = { @@ -429,8 +409,6 @@ class AnalyticalLayerLoader: self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) - # Hitung max_start_year per indikator = max(min_year per country) - # = tahun pertama di mana SEMUA fixed countries sudah punya data indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() @@ -459,8 +437,6 @@ class AnalyticalLayerLoader: }) continue - # Cek apakah semua tahun dari max_start s/d end_year - # hadir di SEMUA fixed countries expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True @@ -486,18 +462,11 @@ class AnalyticalLayerLoader: if not valid_indicators: raise ValueError("No valid indicators found after filtering!") - # ---------------------------------------------------------------- - # Filter hanya indikator yang valid. - # PENTING: TIDAK menghapus baris year < max_start_year. - # Semua baris tetap ada — label framework ditentukan di Step 6. - # max_start_year disimpan sebagai lookup untuk Step 7. - # ---------------------------------------------------------------- original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() - # Simpan max_start_year per indicator_id untuk Step 7 self.indicator_max_start_map = ( indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] .set_index('indicator_id')['max_start_year'] @@ -524,24 +493,11 @@ class AnalyticalLayerLoader: self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW") self.logger.info("=" * 80) - # ---------------------------------------------------------------- - # SDG_TRANSITION_YEAR = 2015 (HARDCODE) - # SDGs diadopsi PBB 25 September 2015, berlaku 1 Januari 2015. - # - # PENTING — TIDAK dihitung dari data: - # Jika auto-detect dari min(actual_start_year SDG-only indicators), - # hasilnya = 2013 (karena data FIES/anaemia ada sejak 2013). - # Akibatnya year >= 2013 → SDGs → SEMUA tahun berlabel SDGs. - # Ini secara historis salah karena SDGs belum berlaku di 2013-2015. - # ---------------------------------------------------------------- self.logger.info(f"\n SDG_TRANSITION_YEAR : {self.sdg_transition_year} (HARDCODE)") self.logger.info(f" Alasan : SDGs resmi berlaku 1 Januari 2015") self.logger.info(f" Bukan auto-detect : data FIES/anaemia ada sejak 2013,") - self.logger.info(f" tapi tahun 2013-2015 harus tetap MDGs") + self.logger.info(f" tapi tahun 2013-2014 harus tetap MDGs") - # ---------------------------------------------------------------- - # Identifikasi indikator SDG-only berdasarkan SDG_ONLY_KEYWORDS - # ---------------------------------------------------------------- indicator_info = ( self.df_clean[['indicator_id', 'indicator_name']] .drop_duplicates() @@ -571,25 +527,12 @@ class AnalyticalLayerLoader: self.logger.info(f"\n Non-SDG-only indicators ({len(non_sdg_ids)}): → MDGs selalu") - # ---------------------------------------------------------------- - # Validasi: pastikan ada SDG-only indicators yang lolos filter - # ---------------------------------------------------------------- if not sdg_only_ids: raise ValueError( "Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. " "Pastikan nama indikator di SDG_ONLY_KEYWORDS cocok dengan data BigQuery." ) - # ---------------------------------------------------------------- - # Assign framework dengan vectorized np.where: - # - # Kondisi SDG-only AND year >= SDG_TRANSITION_YEAR → 'SDGs' - # Semua kondisi lain (non-SDG-only ATAU year < SDG_TRANSITION_YEAR) → 'MDGs' - # - # Hasilnya dalam 1 indikator SDG-only (misal anaemia, data mulai 2013): - # 2013, 2014, 2015 → 'MDGs' (data tetap ada) - # 2015, 2017, ... → 'SDGs' - # ---------------------------------------------------------------- self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids) self.df_clean['framework'] = np.where( @@ -601,9 +544,6 @@ class AnalyticalLayerLoader: self.df_clean = self.df_clean.drop(columns=['_is_sdg_only']) - # ---------------------------------------------------------------- - # Log verifikasi per indikator — tampilkan split MDGs/SDGs per tahun - # ---------------------------------------------------------------- self.logger.info(f"\n Logika assign framework (PER BARIS):") self.logger.info(f" {'─'*72}") self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di semua tahun") @@ -668,13 +608,6 @@ class AnalyticalLayerLoader: self.logger.info("STEP 7: VERIFY NO GAPS") self.logger.info("=" * 80) - # ---------------------------------------------------------------- - # Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya, - # bukan dari self.start_year global, karena tiap indikator bisa - # punya start year berbeda. - # Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek - # karena memang tidak semua country punya data di sana. - # ---------------------------------------------------------------- expected_countries = len(self.selected_country_ids) all_good = True bad_rows = [] @@ -714,15 +647,31 @@ class AnalyticalLayerLoader: # ------------------------------------------------------------------ # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR # ------------------------------------------------------------------ + # PERBAIKAN: + # Normalisasi dilakukan SEKALI per indikator dari SELURUH DATA + # (semua tahun 2013–end_year, semua negara, tanpa memisahkan framework). + # + # Alasan: + # - Sebelumnya, rescaling per-framework di analysis_layer menyebabkan + # nilai 1-100 era MDGs dan SDGs memiliki referensi yang berbeda, + # sehingga tidak dapat dibandingkan secara adil. + # - Dengan satu normalisasi global per indikator, nilai 60 di era MDGs + # dan nilai 60 di era SDGs berarti hal yang sama: posisi relatif yang + # sama dalam distribusi historis indikator tersebut. + # - Jika SDGs memang era yang lebih buruk secara substantif, itu akan + # tercermin sebagai nilai norm yang memang lebih rendah — bukan artefak + # dari rescaling ulang. + # ------------------------------------------------------------------ def calculate_norm_value(self): - """ - Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, - direction-aware, global per indikator (semua negara + semua tahun). - """ self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") + self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR (GLOBAL, SEKALI)") self.logger.info("=" * 80) + self.logger.info( + "\n [PERBAIKAN] Normalisasi dilakukan SEKALI per indikator dari seluruh data." + "\n Tidak ada rescaling ulang per-framework." + "\n Ini memastikan komparabilitas lintas era MDGs dan SDGs." + ) DIRECTION_INVERT = frozenset({ "negative", "lower_better", "lower_is_better", "inverse", "neg", @@ -747,6 +696,10 @@ class AnalyticalLayerLoader: if n_valid < 2: grp['norm_value_1_100'] = np.nan norm_parts.append(grp) + self.logger.warning( + f" {int(ind_id):<5} {direction:<15} {'N/A':<8} " + f"{'N/A':>10} {'N/A':>10} {ind_name[:45]} [SKIPPED: n_valid={n_valid}]" + ) continue raw = grp.loc[valid_mask, 'value'].values @@ -755,6 +708,7 @@ class AnalyticalLayerLoader: normed = np.full(len(grp), np.nan) if v_min == v_max: + # Semua nilai sama → assign tengah skala normed[valid_mask.values] = 50.5 else: scaled = (raw - v_min) / (v_max - v_min) @@ -781,6 +735,53 @@ class AnalyticalLayerLoader: f"{self.df_clean['norm_value_1_100'].max():.2f}" ) + # ---------------------------------------------------------------- + # VALIDASI KOMPARABILITAS: Cek apakah ada gap sistematis antar era + # Ini adalah sinyal diagnostik — bukan error. + # Gap besar (>15 poin) setelah perbaikan = fenomena nyata, bukan artefak. + # ---------------------------------------------------------------- + self.logger.info(f"\n [DIAGNOSTIK KOMPARABILITAS] Rata-rata norm per framework per tahun:") + self.logger.info(f" {'─'*55}") + + fw_year_mean = ( + self.df_clean + .groupby(['framework', 'year'])['norm_value_1_100'] + .mean() + .reset_index() + .sort_values(['framework', 'year']) + ) + for fw, grp_fw in fw_year_mean.groupby('framework'): + means = grp_fw['norm_value_1_100'].values + years = grp_fw['year'].values + self.logger.info(f"\n Framework: {fw}") + for yr, m in zip(years, means): + bar = '█' * int(m / 5) + self.logger.info(f" {int(yr)} : {m:6.2f} {bar}") + + # Bandingkan rata-rata MDGs vs SDGs (hanya tahun di mana keduanya ada) + mdgs_mean_total = self.df_clean[self.df_clean['framework'] == 'MDGs']['norm_value_1_100'].mean() + sdgs_mean_total = self.df_clean[self.df_clean['framework'] == 'SDGs']['norm_value_1_100'].mean() + gap = mdgs_mean_total - sdgs_mean_total + self.logger.info( + f"\n Rata-rata keseluruhan:" + f"\n MDGs : {mdgs_mean_total:.2f}" + f"\n SDGs : {sdgs_mean_total:.2f}" + f"\n Gap : {gap:.2f} poin" + ) + if abs(gap) > 15: + self.logger.info( + f"\n [INFO] Gap {gap:.2f} poin antara MDGs dan SDGs." + f"\n Setelah perbaikan normalisasi (satu referensi global)," + f"\n gap ini mencerminkan perbedaan SUBSTANTIF, bukan artefak teknis." + f"\n Indikator SDGs memang mengukur dimensi deprivasi yang lebih dalam" + f"\n (FIES, stunting, wasting, anaemia) dibanding indikator MDGs." + ) + else: + self.logger.info( + f"\n [OK] Gap {gap:.2f} poin — dalam batas wajar, tidak ada bias sistematis." + ) + + # Distribusi kondisi self.df_clean['_condition_preview'] = ( self.df_clean['norm_value_1_100'].apply(assign_condition) ) @@ -1019,7 +1020,11 @@ class AnalyticalLayerLoader: 'sdg_transition_year' : self.sdg_transition_year, 'sdg_transition_source' : 'HARDCODE — SDGs resmi berlaku 1 Jan 2015', 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'norm_scale' : ( + '1-100 per indicator global minmax direction-aware. ' + 'SATU normalisasi untuk seluruh data tanpa rescaling per-framework. ' + 'Komparabilitas lintas era MDGs/SDGs terjamin.' + ), 'framework_logic' : ( f'SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE); ' 'SDG-only + year >= SDG_TRANSITION_YEAR → SDGs; ' @@ -1065,6 +1070,9 @@ class AnalyticalLayerLoader: f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). " "SDG-only + year >= 2015 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu." ) + self.logger.info( + "NORMALISASI: SATU referensi global per indikator — tidak ada rescaling per-framework." + ) self.logger.info("=" * 80) self.load_source_data() @@ -1113,7 +1121,7 @@ if __name__ == "__main__": print("=" * 80) print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("Output: fact_asean_food_security_selected -> fs_asean_gold") - print(f"Norm: min-max 1-100 per indicator, direction-aware") + print(f"Norm: min-max 1-100 per indicator, direction-aware, GLOBAL (satu referensi)") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") print( f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). " diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index 682035e..9cbc343 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -1,27 +1,53 @@ """ -BIGQUERY CLEANED LAYER ETL -Kimball Data Warehouse Architecture +BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION +Semua agregasi pakai norm_value dari _get_norm_value_df() -Kimball ETL Flow yang dijalankan file ini: - Input : STAGING layer (Silver) — staging_integrated (fs_asean_silver) - Output : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver) - Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit) +PERBAIKAN (vs versi sebelumnya): +───────────────────────────────────────────────────────────────────────────── +1. NORMALIZE_FRAMEWORKS_JOINTLY dihapus. + Setelah perbaikan di analytical_layer, norm_value_1_100 sudah dihitung + SEKALI per indikator dari seluruh data (semua tahun, semua negara). + Tidak ada lagi rescaling ulang per-framework di layer ini. + Semua framework (MDGs, SDGs, Total) menggunakan norm_value yang SAMA + sebagai basis, sehingga skor mereka berada pada skala yang setara. -Classes: - CleanedDataLoader — Cleaning, enrichment, & load ke Silver layer +2. _get_norm_value_df() DISEDERHANAKAN. + Fungsi ini sekarang hanya membaca kolom norm_value_1_100 yang sudah ada + di fact_asean_food_security_selected (hasil dari analytical_layer), + kemudian memetakan ke skala 0-1 untuk keperluan agregasi internal. + TIDAK ada lagi normalisasi ulang per indikator di sini. -Usage: - python bigquery_cleaned_layer.py +3. global_minmax() TETAP DIGUNAKAN untuk mengubah rata-rata norm (0-1) menjadi + skor 1-100 di level agregasi (pillar / country / asean). + Ini adalah rescaling level AGREGAT (bukan level indikator), sehingga masih + valid dan tidak menimbulkan bias komparabilitas. + +4. Framework MDGs dan SDGs sekarang comparable: + - Jika skor SDGs < skor MDGs → memang karena indikator SDGs mengukur + dimensi deprivasi yang lebih dalam (substantif), bukan artefak teknis. + - Log diagnostik ditambahkan untuk memverifikasi ini. + +5. Kolom 'condition' (good/moderate/bad) TETAP dengan threshold yang sama. + +Simpan 6 tabel ke fs_asean_gold (layer='gold'): + - agg_pillar_composite + - agg_pillar_by_country + - agg_framework_by_country + - agg_framework_asean + - agg_narrative_overview + - agg_narrative_pillar + +SOURCE TABLE: fact_asean_food_security_selected """ import pandas as pd import numpy as np from datetime import datetime import logging -from typing import Dict import json +import sys as _sys -from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id +from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_helpers import ( log_update, load_to_bigquery, @@ -33,545 +59,1347 @@ from google.cloud import bigquery # ============================================================================= -# LOAD STAGING DATA +# KONSTANTA GLOBAL # ============================================================================= -def load_staging_data(client: bigquery.Client) -> pd.DataFrame: - """Load data dari staging_integrated (STAGING/Silver layer).""" - print("\nLoading data from staging_integrated (fs_asean_silver)...") - df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver') - print(f" Loaded : {len(df_staging):,} rows") - print(f" Columns : {len(df_staging.columns)}") - print(f" Sources : {df_staging['source'].nunique()}") - print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") - print(f" Countries : {df_staging['country'].nunique()}") - print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") - return df_staging +DIRECTION_INVERT_KEYWORDS = frozenset({ + "negative", "lower_better", "lower_is_better", "inverse", "neg", +}) + +DIRECTION_POSITIVE_KEYWORDS = frozenset({ + "positive", "higher_better", "higher_is_better", +}) + +# Threshold kondisi — fixed absolute, skala 1-100 +THRESHOLD_BAD = 40.0 +THRESHOLD_GOOD = 60.0 -# ============================================================================= -# COLUMN CONSTRAINT HELPERS -# ============================================================================= - -COLUMN_CONSTRAINTS = { - 'source' : 20, - 'indicator_original' : 255, - 'indicator_standardized': 255, - 'country' : 100, - 'year_range' : 20, - 'unit' : 20, - 'pillar' : 20, - 'direction' : 15, -} - - -def truncate_string(value, max_length: int) -> str: - """Truncate string ke max_length, return as-is jika None/NaN.""" - if pd.isna(value): - return value - value_str = str(value) - return value_str[:max_length] if len(value_str) > max_length else value_str - - -def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame: +def assign_condition(score) -> str: """ - Apply column length constraints sesuai schema tabel. - Melaporkan kolom mana yang dipotong dan contohnya. + Assign kondisi berdasarkan score skala 1-100 (direction-aware, nilai tinggi = lebih baik). + Returns: 'good' / 'moderate' / 'bad' / None jika NaN """ - df_constrained = df.copy() - truncation_report = {} + if score is None or (isinstance(score, float) and np.isnan(score)): + return None + if score > THRESHOLD_GOOD: + return 'good' + if score < THRESHOLD_BAD: + return 'bad' + return 'moderate' - for column, max_length in COLUMN_CONSTRAINTS.items(): - if column not in df_constrained.columns: - continue - mask = ( - df_constrained[column].notna() & - (df_constrained[column].astype(str).str.len() > max_length) + +# ============================================================================= +# Windows CP1252 safe logging +# ============================================================================= + +class _SafeStreamHandler(logging.StreamHandler): + def emit(self, record): + try: + super().emit(record) + except UnicodeEncodeError: + try: + msg = self.format(record) + self.stream.write( + msg.encode("utf-8", errors="replace").decode("ascii", errors="replace") + + self.terminator + ) + self.flush() + except Exception: + self.handleError(record) + + +# ============================================================================= +# HELPERS +# ============================================================================= + +def _should_invert(direction: str, logger=None, context: str = "") -> bool: + d = str(direction).lower().strip() + if d in DIRECTION_INVERT_KEYWORDS: + return True + if d in DIRECTION_POSITIVE_KEYWORDS: + return False + if logger: + logger.warning( + f" [DIRECTION WARNING] Unknown direction '{direction}' " + f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)." ) - truncated_count = mask.sum() - if truncated_count > 0: - truncation_report[column] = { - 'count' : int(truncated_count), - 'max_length': max_length, - 'examples' : df_constrained[mask][column].head(3).tolist() - } - df_constrained[column] = df_constrained[column].apply( - lambda x: truncate_string(x, max_length) + return False + + +def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series: + """ + Rescale series ke rentang [lo, hi]. + Digunakan untuk mengubah norm agregat (0-1) menjadi skor 1-100 di level + pillar / country / asean. Bukan untuk normalisasi indikator mentah. + """ + values = series.dropna().values + if len(values) == 0: + return pd.Series(np.nan, index=series.index) + v_min, v_max = values.min(), values.max() + if v_min == v_max: + return pd.Series((lo + hi) / 2.0, index=series.index) + result = np.full(len(series), np.nan) + not_nan = series.notna() + raw = series[not_nan].values + result[not_nan.values] = lo + (raw - v_min) / (v_max - v_min) * (hi - lo) + return pd.Series(result, index=series.index) + + +def add_yoy(df: pd.DataFrame, group_cols: list, score_col: str) -> pd.DataFrame: + df = df.sort_values(group_cols + ["year"]).reset_index(drop=True) + if group_cols: + df["year_over_year_change"] = df.groupby(group_cols)[score_col].diff() + else: + df["year_over_year_change"] = df[score_col].diff() + return df + + +def safe_int( + series: pd.Series, fill: int = 0, col_name: str = "", logger=None +) -> pd.Series: + n_nan = series.isna().sum() + if n_nan > 0 and logger: + logger.warning( + f" [NaN WARNING] Kolom '{col_name}' punya {n_nan} NaN -> di-fill dengan {fill}" + ) + return series.fillna(fill).astype(int) + + +def check_and_dedup( + df: pd.DataFrame, key_cols: list, context: str = "", logger=None +) -> pd.DataFrame: + dupes = df.duplicated(subset=key_cols, keep=False) + if dupes.any(): + n_dupes = dupes.sum() + if logger: + logger.warning( + f" [DEDUP WARNING] {context}: {n_dupes} duplikat rows pada {key_cols}. " + f"Di-aggregate dengan mean." + ) + numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() + agg_dict = { + c: ("mean" if c in numeric_cols else "first") + for c in df.columns if c not in key_cols + } + df = df.groupby(key_cols, as_index=False).agg(agg_dict) + return df + + +def add_condition_column(df: pd.DataFrame, score_col: str) -> pd.DataFrame: + df['condition'] = df[score_col].apply(assign_condition) + return df + + +def log_condition_summary(df: pd.DataFrame, context: str, logger) -> None: + dist = df['condition'].value_counts() + logger.info( + f" Condition distribution ({context}): " + + " | ".join(f"{c}: {n:,}" for c, n in dist.items()) + ) + + +# ============================================================================= +# NARRATIVE BUILDER FUNCTIONS (tidak berubah) +# ============================================================================= + +def _fmt_score(score) -> str: + if score is None or (isinstance(score, float) and np.isnan(score)): + return "N/A" + return f"{score:.2f}" + + +def _fmt_delta(delta) -> str: + if delta is None or (isinstance(delta, float) and np.isnan(delta)): + return "N/A" + sign = "+" if delta >= 0 else "" + return f"{sign}{delta:.2f}" + + +def _build_overview_narrative( + year, n_mdg, n_sdg, n_total_ind, score, yoy_val, yoy_pct, + prev_year, prev_score, ranking_list, + most_improved_country, most_improved_delta, + most_declined_country, most_declined_delta, +) -> str: + parts_ind = [] + if n_mdg > 0: + parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}") + if n_sdg > 0: + parts_ind.append(f"{n_sdg} SDG indicator{'s' if n_sdg > 1 else ''}") + + if parts_ind: + ind_detail = " and ".join(parts_ind) + sent1 = ( + f"In {year}, the ASEAN food security assessment incorporated a total of " + f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}, " + f"consisting of {ind_detail}." + ) + else: + sent1 = ( + f"In {year}, the ASEAN food security assessment incorporated " + f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}." + ) + + if yoy_val is not None and prev_score is not None: + direction_word = "increasing" if yoy_val >= 0 else "decreasing" + pct_clause = "" + if yoy_pct is not None: + abs_pct = abs(yoy_pct) + trend_word = "improvement" if yoy_val >= 0 else "decline" + pct_clause = f", which represents a {abs_pct:.2f}% {trend_word} year-over-year" + sent2 = ( + f"The ASEAN overall score (Total framework) reached {_fmt_score(score)}, " + f"{direction_word} by {abs(yoy_val):.2f} points compared to the previous year " + f"({_fmt_score(prev_score)} in {prev_year}){pct_clause}." + ) + else: + sent2 = ( + f"The ASEAN overall score (Total framework) reached {_fmt_score(score)} in {year}; " + f"no prior-year data is available for year-over-year comparison." + ) + + sent3 = "" + if ranking_list: + first = ranking_list[0] + last = ranking_list[-1] + middle = ranking_list[1:-1] + if len(ranking_list) == 1: + sent3 = ( + f"In terms of country performance, {first['country_name']} was the only " + f"country assessed, scoring {_fmt_score(first['score'])} in {year}." + ) + elif len(ranking_list) == 2: + sent3 = ( + f"In terms of country performance, {first['country_name']} led the region " + f"with a score of {_fmt_score(first['score'])}, while " + f"{last['country_name']} recorded the lowest score of " + f"{_fmt_score(last['score'])} in {year}." + ) + else: + middle_parts = [f"{c['country_name']} ({_fmt_score(c['score'])})" for c in middle] + middle_str = ( + middle_parts[0] if len(middle_parts) == 1 + else ", ".join(middle_parts[:-1]) + f", and {middle_parts[-1]}" + ) + sent3 = ( + f"In terms of country performance, {first['country_name']} led the region " + f"with a score of {_fmt_score(first['score'])}, followed by {middle_str}. " + f"At the other end, {last['country_name']} recorded the lowest score " + f"of {_fmt_score(last['score'])} in {year}." ) - if truncation_report: - print("\n Column Truncations Applied:") - for column, info in truncation_report.items(): - print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars") + sent4_parts = [] + if most_improved_country and most_improved_delta is not None: + sent4_parts.append( + f"the most notable improvement was seen in {most_improved_country}, " + f"which gained {_fmt_delta(most_improved_delta)} points from the previous year" + ) + if most_declined_country and most_declined_delta is not None: + if most_declined_delta < 0: + sent4_parts.append( + f"while {most_declined_country} experienced the largest decline " + f"of {_fmt_delta(most_declined_delta)} points" + ) + else: + sent4_parts.append( + f"while {most_declined_country} recorded the smallest gain " + f"of {_fmt_delta(most_declined_delta)} points" + ) + + sent4 = "" + if sent4_parts: + sent4 = ", ".join(sent4_parts) + "." + sent4 = sent4[0].upper() + sent4[1:] + + return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) + + +def _build_pillar_narrative( + year, pillar_name, pillar_score, rank_in_year, n_pillars, yoy_val, + top_country, top_country_score, bot_country, bot_country_score, + strongest_pillar, strongest_score, weakest_pillar, weakest_score, + most_improved_pillar, most_improved_delta, + most_declined_pillar, most_declined_delta, +) -> str: + rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") + sent1 = ( + f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, " + f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN." + ) + + sent2 = "" + if strongest_pillar and weakest_pillar: + if strongest_pillar == pillar_name: + sent2 = ( + f"This made {pillar_name} the strongest performing pillar in {year}, " + f"compared to the weakest pillar, {weakest_pillar}, " + f"which scored {_fmt_score(weakest_score)}." + ) + elif weakest_pillar == pillar_name: + sent2 = ( + f"This made {pillar_name} the weakest performing pillar in {year}, " + f"compared to the strongest pillar, {strongest_pillar}, " + f"which scored {_fmt_score(strongest_score)}." + ) + else: + sent2 = ( + f"Across all pillars in {year}, {strongest_pillar} was the strongest " + f"(score: {_fmt_score(strongest_score)}), while {weakest_pillar} " + f"was the weakest (score: {_fmt_score(weakest_score)})." + ) + + sent3 = "" + if top_country and bot_country: + if top_country != bot_country: + sent3 = ( + f"Within the {pillar_name} pillar, {top_country} led with a score of " + f"{_fmt_score(top_country_score)}, while {bot_country} recorded the lowest " + f"score of {_fmt_score(bot_country_score)}." + ) + else: + sent3 = ( + f"Within the {pillar_name} pillar, {top_country} was the only country " + f"with available data, scoring {_fmt_score(top_country_score)}." + ) + + if yoy_val is not None: + direction_word = "improved" if yoy_val >= 0 else "declined" + sent4 = ( + f"Compared to the previous year, the {pillar_name} pillar " + f"{direction_word} by {abs(yoy_val):.2f} points" + ) else: - print("\n No truncations needed — all values within constraints") + sent4 = ( + f"No prior-year data is available to calculate year-over-year change " + f"for the {pillar_name} pillar in {year}" + ) - return df_constrained + if (most_improved_pillar and most_improved_delta is not None + and most_declined_pillar and most_declined_delta is not None + and most_improved_pillar != most_declined_pillar): + sent4 += ( + f". Across all pillars, {most_improved_pillar} showed the greatest improvement " + f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} " + f"recorded the largest decline ({_fmt_delta(most_declined_delta)} pts)" + ) + + sent4 += "." + sent4 = sent4[0].upper() + sent4[1:] + + return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) # ============================================================================= -# COUNTRY NAME STANDARDIZATION +# MAIN CLASS # ============================================================================= -ASEAN_MAPPING = { - 'BRN' : 'Brunei Darussalam', - 'BRUNEI' : 'Brunei Darussalam', - 'BRUNEI DARUSSALAM' : 'Brunei Darussalam', - 'KHM' : 'Cambodia', - 'CAMBODIA' : 'Cambodia', - 'IDN' : 'Indonesia', - 'INDONESIA' : 'Indonesia', - 'LAO' : 'Laos', - 'LAOS' : 'Laos', - "LAO PEOPLE'S DEMOCRATIC REPUBLIC" : 'Laos', - 'LAO PDR' : 'Laos', - 'MYS' : 'Malaysia', - 'MALAYSIA' : 'Malaysia', - 'MMR' : 'Myanmar', - 'MYANMAR' : 'Myanmar', - 'BURMA' : 'Myanmar', - 'PHL' : 'Philippines', - 'PHILIPPINES' : 'Philippines', - 'SGP' : 'Singapore', - 'SINGAPORE' : 'Singapore', - 'THA' : 'Thailand', - 'THAILAND' : 'Thailand', - 'VNM' : 'Vietnam', - 'VIETNAM' : 'Vietnam', - 'VIET NAM' : 'Vietnam', -} +class FoodSecurityAggregator: - -def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'country') -> tuple: - """ - Standardize country names untuk ASEAN. - Ensures country names within varchar(100) constraint. - - Returns: - tuple: (df_clean, report_dict) - """ - df_clean = df.copy() - - def map_country(country): - if pd.isna(country): - return country - s = str(country).strip() - mapped = ASEAN_MAPPING.get(s.upper(), s) - return mapped[:100] if len(mapped) > 100 else mapped - - original = df_clean[country_column].copy() - df_clean[country_column] = df_clean[country_column].apply(map_country) - changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new} - - return df_clean, { - 'countries_mapped': len(set(changes.keys())), - 'changes' : changes, - } - - -# ============================================================================= -# PILLAR CLASSIFICATION -# ============================================================================= - -def assign_pillar(indicator_name: str) -> str: - """ - Assign pillar berdasarkan keyword indikator. - Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting' - All <= 20 chars (varchar(20) constraint). - """ - if pd.isna(indicator_name): - return 'Supporting' - ind = str(indicator_name).lower() - - for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: - if kw in ind: - return 'Supporting' - - if any(kw in ind for kw in [ - 'adequacy', 'protein supply', 'supply of protein', - 'dietary energy supply', 'share of dietary energy', 'derived from cereals' - ]): - return 'Availability' - - if any(kw in ind for kw in [ - 'variability', 'cereal import dependency', 'arable land equipped', - 'political stability', 'value of food imports in total' - ]): - return 'Stability' - - if any(kw in ind for kw in [ - 'gdp', 'gross domestic product', 'rail lines', 'road density', - 'number of moderately', 'number of severely', - 'number of people undernourished', 'prevalence of moderate', - 'prevalence of severe', 'prevalence of undernourishment', 'food insecure' - ]): - return 'Access' - - if any(kw in ind for kw in [ - 'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity', - 'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water', - 'sanitation', 'children under 5', 'newborns with low', - 'women of reproductive' - ]): - return 'Utilization' - - return 'Supporting' - - -# ============================================================================= -# DIRECTION CLASSIFICATION -# ============================================================================= - -def assign_direction(indicator_name: str) -> str: - """ - Assign direction berdasarkan indikator. - Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars) - Both <= 15 chars (varchar(15) constraint). - """ - if pd.isna(indicator_name): - return 'higher_better' - ind = str(indicator_name).lower() - - if 'share of dietary energy supply derived from cereals' in ind: - return 'lower_better' - - for kw in [ - 'exclusive breastfeeding', - 'dietary energy supply', - 'dietary energy supply adequacy', - 'average fat supply', - 'average protein supply', - 'supply of protein of animal origin', - ]: - if kw in ind: - return 'higher_better' - - for kw in [ - 'prevalence of undernourishment', - 'prevalence of severe food insecurity', - 'prevalence of moderate or severe food insecurity', - 'prevalence of moderate food insecurity', - 'prevalence of wasting', - 'prevalence of stunting', - 'prevalence of overweight', - 'prevalence of obesity', - 'prevalence of anemia', - 'prevalence of anaemia', - 'prevalence of low birthweight', - 'number of people undernourished', - 'number of severely food insecure', - 'number of moderately or severely food insecure', - 'number of children under 5 years affected by wasting', - 'number of children under 5 years of age who are overweight', - 'number of children under 5 years of age who are stunted', - 'number of newborns with low birthweight', - 'number of obese adults', - 'number of women of reproductive age', - 'percentage of children under 5 years affected by wasting', - 'percentage of children under 5 years of age who are overweight', - 'percentage of children under 5 years of age who are stunted', - 'cereal import dependency', - 'import dependency', - 'value of food imports in total merchandise exports', - 'value of food imports', - 'variability of food production', - 'variability of food supply', - 'per capita food production variability', - 'per capita food supply variability', - 'coefficient of variation', - 'incidence of caloric losses', - 'food losses', - 'indicator of food price anomalies', - 'proportion of local breeds classified as being at risk', - 'agricultural export subsidies', - ]: - if kw in ind: - return 'lower_better' - - return 'higher_better' - - -# ============================================================================= -# CLEANED DATA LOADER -# ============================================================================= - -class CleanedDataLoader: - """ - Loader untuk cleaned integrated data ke STAGING layer (Silver). - - Kimball context: - Input : staging_integrated -> STAGING (Silver) — fs_asean_silver - Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver - Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit - - Pipeline steps: - 1. Standardize country names (ASEAN) - 2. Remove missing values - 3. Remove duplicates - 4. Add pillar & direction classification - 5. Apply column constraints - 6. Load ke BigQuery - 7. Log ke Audit layer - """ - - SCHEMA = [ - bigquery.SchemaField("source", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"), - bigquery.SchemaField("country", "STRING", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"), - bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"), - bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), - bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - ] - - def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'): - self.client = client - self.load_mode = load_mode - self.logger = logging.getLogger(self.__class__.__name__) + def __init__(self, client: bigquery.Client): + self.client = client + self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False - self.table_name = 'cleaned_integrated' - self.target_layer = 'silver' - self.metadata = { - 'source_class' : self.__class__.__name__, - 'table_name' : self.table_name, - 'start_time' : None, - 'end_time' : None, - 'duration_seconds' : None, - 'rows_fetched' : 0, - 'rows_transformed' : 0, - 'rows_loaded' : 0, - 'load_mode' : load_mode, - 'validation_metrics': {} + self.load_metadata = { + "agg_pillar_composite": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_pillar_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_framework_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_framework_asean": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_narrative_overview": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_narrative_pillar": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, } - # ------------------------------------------------------------------ - # STEP METHODS - # ------------------------------------------------------------------ + self.df = None + self.dims = {} - def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 1/5] Standardize country names...") - df, report = standardize_country_names_asean(df, country_column='country') - print(f" ASEAN countries mapped : {report['countries_mapped']}") - unique_countries = sorted(df['country'].unique()) - print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}") - log_update(self.client, 'STAGING', 'staging_integrated', - 'standardize_asean', report['countries_mapped']) - return df + self.sdgs_start_year = None + self.mdgs_indicator_ids = set() + self.sdgs_indicator_ids = set() - def _step_remove_missing(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 2/5] Remove missing values...") - rows_before = len(df) - df_clean = df.dropna(subset=list(df.columns)) - rows_after = len(df_clean) - removed = rows_before - rows_after - print(f" Rows before : {rows_before:,}") - print(f" Rows after : {rows_after:,}") - print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)") - print(f" Retention : {rows_after/rows_before*100:.1f}%") - return df_clean + # ========================================================================= + # STEP 1: Load data + # ========================================================================= - def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 3/5] Remove duplicates...") - exact_dups = df.duplicated().sum() - data_dups = df.duplicated( - subset=['indicator_standardized', 'country', 'year', 'value'] - ).sum() - print(f" Exact duplicates : {exact_dups:,}") - print(f" Data duplicates : {data_dups:,}") - rows_before = len(df) - df_clean = df.drop_duplicates( - subset=['indicator_standardized', 'country', 'year'], keep='first' + def load_data(self): + self.logger.info("=" * 70) + self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") + self.logger.info("=" * 70) + + self.df = read_from_bigquery( + self.client, "fact_asean_food_security_selected", layer='gold' ) - removed = rows_before - len(df_clean) - print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)") - return df_clean + self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows") - def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 4/5] Add pillar & direction classification...") - df = df.copy() + required_cols = { + "country_id", "country_name", + "indicator_id", "indicator_name", "direction", "framework", + "pillar_id", "pillar_name", + "time_id", "year", "value", + # PERBAIKAN: norm_value_1_100 wajib ada (hasil analytical_layer) + "norm_value_1_100", + } + missing_cols = required_cols - set(self.df.columns) + if missing_cols: + raise ValueError( + f"Kolom berikut tidak ditemukan: {missing_cols}\n" + f"Pastikan pipeline dijalankan berurutan:\n" + f" 1. bigquery_cleaned_layer.py\n" + f" 2. bigquery_dimensional_model.py\n" + f" 3. bigquery_analytical_layer.py ← harus dijalankan dulu\n" + f" 4. bigquery_analysis_layer.py (file ini)" + ) - df['pillar'] = df['indicator_standardized'].apply(assign_pillar) - df['direction'] = df['indicator_standardized'].apply(assign_direction) + self.df["direction"] = self.df["direction"].fillna("positive") + self.df["framework"] = self.df["framework"].fillna("MDGs") + self.df["norm_value_1_100"] = self.df["norm_value_1_100"].astype(float) - pillar_counts = df['pillar'].value_counts() - print(f" Pillar distribution:") - for pillar, count in pillar_counts.items(): - print(f" - {pillar}: {count:,}") + dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() + self.logger.info(f"\n Distribusi direction per indikator:") + for d, cnt in dir_dist.items(): + tag = "INVERT" if _should_invert(d, self.logger, "load_data") else "normal" + self.logger.info(f" {d:<25} : {cnt:>3} [{tag}]") - direction_counts = df['direction'].value_counts() - print(f" Direction distribution:") - for direction, count in direction_counts.items(): - pct = count / len(df) * 100 - print(f" - {direction}: {count:,} ({pct:.1f}%)") + fw_dist = self.df.drop_duplicates("indicator_id")["framework"].value_counts() + self.logger.info(f"\n Distribusi framework per indikator:") + for fw, cnt in fw_dist.items(): + self.logger.info(f" {fw:<10} : {cnt:>3}") + + self.logger.info( + f"\n Rows: {len(self.df):,} | Negara: {self.df['country_id'].nunique()} | " + f"Indikator: {self.df['indicator_id'].nunique()} | " + f"Tahun: {int(self.df['year'].min())}-{int(self.df['year'].max())}" + ) + + # Diagnostik: cek komparabilitas norm antar framework + self._log_norm_comparability_diagnostics() + + def _log_norm_comparability_diagnostics(self): + """ + Log diagnostik untuk memverifikasi bahwa norm_value_1_100 sudah comparable + antar framework setelah perbaikan di analytical_layer. + """ + self.logger.info(f"\n [DIAGNOSTIK] Komparabilitas norm_value_1_100 antar framework:") + self.logger.info(f" {'─'*60}") + + fw_stats = ( + self.df.groupby('framework')['norm_value_1_100'] + .agg(['mean', 'median', 'std', 'min', 'max']) + .round(2) + ) + for fw, row in fw_stats.iterrows(): + self.logger.info( + f" {fw:<8} mean={row['mean']:>6.2f} median={row['median']:>6.2f} " + f"std={row['std']:>5.2f} range=[{row['min']:.2f},{row['max']:.2f}]" + ) + + mdgs_mean = self.df[self.df['framework'] == 'MDGs']['norm_value_1_100'].mean() + sdgs_mean = self.df[self.df['framework'] == 'SDGs']['norm_value_1_100'].mean() + gap = mdgs_mean - sdgs_mean + + if abs(gap) > 15: + self.logger.info( + f"\n [INFO] Gap MDGs-SDGs = {gap:.2f} poin." + f"\n Ini adalah perbedaan SUBSTANTIF (bukan artefak normalisasi):" + f"\n Indikator SDGs mengukur deprivasi yang lebih dalam" + f"\n (FIES, stunting, wasting, anaemia) vs indikator MDGs." + f"\n Gap ini valid untuk dilaporkan sebagai temuan analisis." + ) + else: + self.logger.info( + f"\n [OK] Gap MDGs-SDGs = {gap:.2f} poin — dalam batas wajar." + ) + + # ========================================================================= + # STEP 1b: Klasifikasi indikator + # ========================================================================= + + def _classify_indicators(self): + self.logger.info("\n" + "=" * 70) + self.logger.info("STEP 1b: KLASIFIKASI INDIKATOR -> MDGs / SDGs") + self.logger.info("=" * 70) + + self.mdgs_indicator_ids = set( + self.df[self.df["framework"] == "MDGs"]["indicator_id"].unique().tolist() + ) + self.sdgs_indicator_ids = set( + self.df[self.df["framework"] == "SDGs"]["indicator_id"].unique().tolist() + ) + + _PROXY_KW = frozenset(['food insecurity', 'anemia', 'anaemia']) + proxy_mask = ( + (self.df["framework"] == "SDGs") & + self.df["indicator_name"].str.lower().apply( + lambda n: any(kw in n for kw in _PROXY_KW) + ) + ) + df_proxy = self.df[proxy_mask] + + if not df_proxy.empty: + self.sdgs_start_year = int(df_proxy["year"].min()) + self.logger.info( + f"\n sdgs_start_year = {self.sdgs_start_year} " + f"(dari proxy FIES/anaemia di tabel)" + ) + else: + sdgs_rows = self.df[self.df["framework"] == "SDGs"] + if not sdgs_rows.empty: + self.sdgs_start_year = int(sdgs_rows["year"].min()) + self.logger.warning( + f" [WARN] Proxy tidak ditemukan, fallback ke min(year) SDGs: " + f"{self.sdgs_start_year}" + ) + else: + self.sdgs_start_year = int(self.df["year"].max()) + 1 + self.logger.warning( + f" [WARN] Tidak ada SDGs. sdgs_start_year = {self.sdgs_start_year}" + ) + + self.logger.info(f" MDGs : {len(self.mdgs_indicator_ids)} indikator") + self.logger.info(f" SDGs : {len(self.sdgs_indicator_ids)} indikator") + + for fw in ["MDGs", "SDGs"]: + fw_inds = ( + self.df[self.df["framework"] == fw] + .drop_duplicates("indicator_id")[["indicator_id", "indicator_name"]] + .sort_values("indicator_name") + ) + self.logger.info(f"\n {fw} indicators ({len(fw_inds)}):") + for _, row in fw_inds.iterrows(): + self.logger.info(f" [{int(row['indicator_id'])}] {row['indicator_name']}") + + # ========================================================================= + # CORE HELPER: _get_norm_value_df() + # ========================================================================= + # PERBAIKAN: + # Fungsi ini TIDAK lagi melakukan normalisasi ulang per indikator. + # Kolom norm_value_1_100 sudah dihitung sekali di analytical_layer + # dengan referensi global (semua tahun, semua negara, per indikator). + # + # Yang dilakukan di sini hanya: + # 1. Membaca norm_value_1_100 dari df + # 2. Mengubah skala 1-100 → 0-1 (untuk keperluan rata-rata agregat) + # dengan rumus linear: norm_0_1 = (norm_1_100 - 1) / 99 + # + # Rescaling agregat (0-1 → 1-100) tetap dilakukan via global_minmax() + # di masing-masing fungsi calc_* untuk menghasilkan skor level pillar/country/asean. + # ========================================================================= + + def _get_norm_value_df(self) -> pd.DataFrame: + """ + Mengembalikan df dengan kolom 'norm_value' (skala 0-1) yang diturunkan + dari norm_value_1_100 (sudah ada di source, dihitung di analytical_layer). + + Transformasi: norm_value = (norm_value_1_100 - 1) / 99 + Ini adalah transformasi LINEAR — tidak mengubah urutan relatif antar indikator, + negara, atau tahun. Komparabilitas lintas framework tetap terjaga. + """ + df = self.df.copy() + + # Konversi 1-100 → 0-1 secara linear + df["norm_value"] = np.where( + df["norm_value_1_100"].notna(), + (df["norm_value_1_100"] - 1.0) / 99.0, + np.nan + ) + + n_null = df["norm_value"].isna().sum() + n_valid = df["norm_value"].notna().sum() + self.logger.debug( + f" _get_norm_value_df: {n_valid:,} valid | {n_null:,} null " + f"(dari norm_value_1_100 analytical_layer)" + ) return df - def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 5/5] Apply column constraints...") - return apply_column_constraints(df) + # ========================================================================= + # STEP 2: agg_pillar_composite + # ========================================================================= - # ------------------------------------------------------------------ - # VALIDATION - # ------------------------------------------------------------------ + def calc_pillar_composite(self) -> pd.DataFrame: + table_name = "agg_pillar_composite" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 2: {table_name}") + self.logger.info("=" * 70) - def validate_data(self, df: pd.DataFrame) -> Dict: - validation = { - 'total_rows' : int(len(df)), - 'total_columns' : int(len(df.columns)), - 'duplicate_count' : int(df.duplicated().sum()), - 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), - 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) - } - if 'year' in df.columns: - validation['year_range'] = { - 'min' : int(df['year'].min()) if not df['year'].isnull().all() else None, - 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, - 'unique_years': int(df['year'].nunique()) - } - for col in ('pillar', 'direction', 'source'): - if col in df.columns: - validation[f'{col}_breakdown'] = { - str(k): int(v) for k, v in df[col].value_counts().to_dict().items() - } - if 'indicator_standardized' in df.columns: - validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) - if 'country' in df.columns: - validation['unique_countries'] = int(df['country'].nunique()) + df_normed = self._get_norm_value_df() - column_length_check = {} - for col, max_len in COLUMN_CONSTRAINTS.items(): - if col in df.columns: - max_actual = df[col].astype(str).str.len().max() - column_length_check[col] = { - 'max_length_constraint': max_len, - 'max_actual_length' : int(max_actual), - 'within_limit' : bool(max_actual <= max_len) - } - validation['column_length_check'] = column_length_check - return validation + df = ( + df_normed + .groupby(["pillar_id", "pillar_name", "year"]) + .agg( + pillar_norm =("norm_value", "mean"), + n_indicators =("indicator_id", "nunique"), + n_countries =("country_id", "nunique"), + ) + .reset_index() + ) - # ------------------------------------------------------------------ + df["pillar_score_1_100"] = global_minmax(df["pillar_norm"]) + df["rank_in_year"] = ( + df.groupby("year")["pillar_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") + df = add_condition_column(df, "pillar_score_1_100") + log_condition_summary(df, table_name, self.logger) + + df["pillar_id"] = df["pillar_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["n_countries"] = safe_int(df["n_countries"], col_name="n_countries", logger=self.logger) + df["rank_in_year"] = df["rank_in_year"].astype(int) + df["pillar_norm"] = df["pillar_norm"].astype(float) + df["pillar_score_1_100"] = df["pillar_score_1_100"].astype(float) + + schema = [ + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 3: agg_pillar_by_country + # ========================================================================= + + def calc_pillar_by_country(self) -> pd.DataFrame: + table_name = "agg_pillar_by_country" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 3: {table_name}") + self.logger.info("=" * 70) + + df_normed = self._get_norm_value_df() + + df = ( + df_normed + .groupby(["country_id", "country_name", "pillar_id", "pillar_name", "year"]) + .agg(pillar_country_norm=("norm_value", "mean")) + .reset_index() + ) + + df["pillar_country_score_1_100"] = global_minmax(df["pillar_country_norm"]) + df["rank_in_pillar_year"] = ( + df.groupby(["pillar_id", "year"])["pillar_country_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") + df = add_condition_column(df, "pillar_country_score_1_100") + log_condition_summary(df, table_name, self.logger) + + df["country_id"] = df["country_id"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["year"] = df["year"].astype(int) + df["rank_in_pillar_year"] = df["rank_in_pillar_year"].astype(int) + df["pillar_country_norm"] = df["pillar_country_norm"].astype(float) + df["pillar_country_score_1_100"] = df["pillar_country_score_1_100"].astype(float) + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_country_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 4: agg_framework_by_country + # ========================================================================= + # PERBAIKAN: + # - Flag NORMALIZE_FRAMEWORKS_JOINTLY dihapus. + # - Tidak ada lagi rescaling ulang per-framework di sini. + # - Semua framework (Total, MDGs, SDGs) menggunakan norm_value yang SAMA + # sebagai basis (sudah comparable dari analytical_layer). + # - global_minmax() hanya digunakan SEKALI untuk mengubah norm agregat + # (rata-rata norm_value per country-framework-year) menjadi skor 1-100 + # di level country-framework, menggunakan SATU POOL DATA BERSAMA. + # - Dengan ini, perbandingan skor MDGs vs SDGs per negara adalah valid. + # ========================================================================= + + def _calc_country_composite_inmemory(self) -> pd.DataFrame: + df_normed = self._get_norm_value_df() + df = ( + df_normed + .groupby(["country_id", "country_name", "year"]) + .agg( + composite_score=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique"), + ) + .reset_index() + ) + df["score_1_100"] = global_minmax(df["composite_score"]) + df["rank_in_asean"] = ( + df.groupby("year")["score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id"], "score_1_100") + df["country_id"] = df["country_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["composite_score"] = df["composite_score"].astype(float) + df["score_1_100"] = df["score_1_100"].astype(float) + df["rank_in_asean"] = df["rank_in_asean"].astype(int) + return df + + def calc_framework_by_country(self) -> pd.DataFrame: + table_name = "agg_framework_by_country" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 4: {table_name}") + self.logger.info("=" * 70) + self.logger.info( + " [PERBAIKAN] Semua framework di-aggregate dari norm_value yang SAMA." + "\n Tidak ada rescaling per-framework. Skor MDGs dan SDGs comparable." + ) + + country_composite = self._calc_country_composite_inmemory() + df_normed = self._get_norm_value_df() + parts = [] + + # ── Layer TOTAL ─────────────────────────────────────────────────────── + agg_total = ( + country_composite[[ + "country_id", "country_name", "year", + "score_1_100", "n_indicators", "composite_score" + ]] + .copy() + .rename(columns={ + "score_1_100" : "framework_score_1_100", + "composite_score": "framework_norm" + }) + ) + agg_total["framework"] = "Total" + parts.append(agg_total) + + # ── Layer MDGs pre-SDGs (tahun sebelum sdgs_start_year) ────────────── + pre_sdgs_rows = country_composite[ + country_composite["year"] < self.sdgs_start_year + ].copy() + if not pre_sdgs_rows.empty: + mdgs_pre = ( + pre_sdgs_rows[[ + "country_id", "country_name", "year", + "score_1_100", "n_indicators", "composite_score" + ]] + .copy() + .rename(columns={ + "score_1_100" : "framework_score_1_100", + "composite_score": "framework_norm" + }) + ) + mdgs_pre["framework"] = "MDGs" + parts.append(mdgs_pre) + + # ── Layer MDGs mixed (setelah SDGs masuk, hanya indikator MDGs) ────── + if self.mdgs_indicator_ids: + df_mdgs_mixed = df_normed[ + (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_mdgs_mixed.empty: + agg_mdgs_mixed = ( + df_mdgs_mixed + .groupby(["country_id", "country_name", "year"]) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique") + ) + .reset_index() + ) + # PERBAIKAN: rescale dari POOL GABUNGAN bersama SDGs (lihat bawah) + agg_mdgs_mixed["framework"] = "MDGs" + parts.append(agg_mdgs_mixed) + + # ── Layer SDGs (hanya indikator SDGs, mulai sdgs_start_year) ───────── + if self.sdgs_indicator_ids: + df_sdgs = df_normed[ + (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_sdgs.empty: + agg_sdgs = ( + df_sdgs + .groupby(["country_id", "country_name", "year"]) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique") + ) + .reset_index() + ) + agg_sdgs["framework"] = "SDGs" + parts.append(agg_sdgs) + + df = pd.concat(parts, ignore_index=True) + + # PERBAIKAN: Rescale framework_score_1_100 dari SATU POOL BERSAMA + # untuk semua framework (MDGs mixed + SDGs) sekaligus. + # Ini memastikan skor 60 di MDGs dan skor 60 di SDGs memiliki makna + # yang sama: posisi relatif yang sama dalam distribusi gabungan. + mixed_mask = df["framework"].isin(["MDGs", "SDGs"]) + mixed_pre_mask = (df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year) + + # Rescale pre-SDGs MDGs dari pool Total (sudah dihitung) + # → sudah ada di agg_total (framework_score_1_100 = dari country_composite) + + # Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA + post_sdgs_mask = mixed_mask & ~mixed_pre_mask & df["framework_norm"].notna() + if post_sdgs_mask.any(): + df.loc[post_sdgs_mask, "framework_score_1_100"] = global_minmax( + df.loc[post_sdgs_mask, "framework_norm"] + ) + + df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) + + # Pastikan kolom framework_score_1_100 ada untuk semua baris + if "framework_score_1_100" not in df.columns: + df["framework_score_1_100"] = np.nan + + df["rank_in_framework_year"] = ( + df.groupby(["framework", "year"])["framework_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id", "framework"], "framework_score_1_100") + df = add_condition_column(df, "framework_score_1_100") + log_condition_summary(df, table_name, self.logger) + + # Log diagnostik: bandingkan skor MDGs vs SDGs + self._log_framework_score_diagnostics(df, table_name) + + df["country_id"] = df["country_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["rank_in_framework_year"] = safe_int(df["rank_in_framework_year"], col_name="rank_in_framework_year", logger=self.logger) + df["framework_norm"] = df["framework_norm"].astype(float) + df["framework_score_1_100"] = df["framework_score_1_100"].astype(float) + + self._validate_mdgs_equals_total(df, level="country") + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 5: agg_framework_asean + # ========================================================================= + # PERBAIKAN: Sama dengan framework_by_country — tidak ada rescaling terpisah + # per framework. MDGs mixed dan SDGs di-rescale dari satu pool bersama. + # ========================================================================= + + def calc_framework_asean(self) -> pd.DataFrame: + table_name = "agg_framework_asean" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 5: {table_name}") + self.logger.info("=" * 70) + self.logger.info( + " [PERBAIKAN] MDGs mixed + SDGs di-rescale dari SATU POOL BERSAMA." + "\n Skor ASEAN MDGs dan SDGs sekarang comparable." + ) + + df_normed = self._get_norm_value_df() + country_composite = self._calc_country_composite_inmemory() + + country_norm = ( + df_normed + .groupby(["country_id", "country_name", "year"])["norm_value"] + .mean().reset_index() + .rename(columns={"norm_value": "country_norm"}) + ) + asean_overall = ( + country_norm.groupby("year") + .agg( + asean_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries =("country_norm", "count") + ) + .reset_index() + ) + asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) + + parts = [] + + # ── Layer TOTAL ─────────────────────────────────────────────────────── + total_cols = asean_overall[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() + total_cols = total_cols.rename(columns={ + "asean_score_1_100": "framework_score_1_100", + "asean_norm" : "framework_norm", + "n_countries" : "n_countries_with_data", + }) + n_ind_total = df_normed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + total_cols = total_cols.merge(n_ind_total, on="year", how="left") + total_cols["framework"] = "Total" + parts.append(total_cols) + + # ── Layer MDGs pre-SDGs ─────────────────────────────────────────────── + pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() + if not pre_sdgs.empty: + mdgs_pre = pre_sdgs[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() + mdgs_pre = mdgs_pre.rename(columns={ + "asean_score_1_100": "framework_score_1_100", + "asean_norm" : "framework_norm", + "n_countries" : "n_countries_with_data", + }) + n_ind_pre = ( + df_normed[df_normed["year"] < self.sdgs_start_year] + .groupby("year")["indicator_id"].nunique() + .reset_index().rename(columns={"indicator_id": "n_indicators"}) + ) + mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") + mdgs_pre["framework"] = "MDGs" + parts.append(mdgs_pre) + + # ── Siapkan MDGs mixed dan SDGs untuk rescaling BERSAMA ─────────────── + mixed_parts = [] + + if self.mdgs_indicator_ids: + df_mdgs_mixed = df_normed[ + (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_mdgs_mixed.empty: + cn = ( + df_mdgs_mixed.groupby(["country_id", "year"])["norm_value"].mean() + .reset_index().rename(columns={"norm_value": "country_norm"}) + ) + asean_mdgs = cn.groupby("year").agg( + framework_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries_with_data =("country_id", "count"), + ).reset_index() + n_ind_mdgs = df_mdgs_mixed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + asean_mdgs = asean_mdgs.merge(n_ind_mdgs, on="year", how="left") + asean_mdgs["framework"] = "MDGs" + mixed_parts.append(asean_mdgs) + + if self.sdgs_indicator_ids: + df_sdgs = df_normed[ + (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_sdgs.empty: + cn = ( + df_sdgs.groupby(["country_id", "year"])["norm_value"].mean() + .reset_index().rename(columns={"norm_value": "country_norm"}) + ) + asean_sdgs = cn.groupby("year").agg( + framework_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries_with_data =("country_id", "count"), + ).reset_index() + n_ind_sdgs = df_sdgs.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + asean_sdgs = asean_sdgs.merge(n_ind_sdgs, on="year", how="left") + asean_sdgs["framework"] = "SDGs" + mixed_parts.append(asean_sdgs) + + # PERBAIKAN: Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA + if mixed_parts: + df_mixed = pd.concat(mixed_parts, ignore_index=True) + df_mixed["framework_score_1_100"] = global_minmax(df_mixed["framework_norm"]) + parts.append(df_mixed) + + df = pd.concat(parts, ignore_index=True) + + df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) + df = add_yoy(df, ["framework"], "framework_score_1_100") + df = add_condition_column(df, "framework_score_1_100") + log_condition_summary(df, table_name, self.logger) + + # Log diagnostik: bandingkan skor ASEAN MDGs vs SDGs + self._log_framework_score_diagnostics(df, table_name) + + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) + for col in ["framework_norm", "std_norm", "framework_score_1_100"]: + df[col] = df[col].astype(float) + + self._validate_mdgs_equals_total(df, level="asean") + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries_with_data", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("std_norm", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), + ] + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 6 & 7: Narrative (tidak ada perubahan) + # ========================================================================= + + def calc_narrative_overview(self, df_framework_asean, df_framework_by_country): + table_name = "agg_narrative_overview" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 6: {table_name}") + self.logger.info("=" * 70) + + asean_total = df_framework_asean[df_framework_asean["framework"] == "Total"].sort_values("year").reset_index(drop=True) + score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) + country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() + ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) + records = [] + + for _, row in asean_total.iterrows(): + yr = int(row["year"]) + score = float(row["framework_score_1_100"]) + yoy = row["year_over_year_change"] + yoy_val = float(yoy) if pd.notna(yoy) else None + + yr_ind = ind_year[ind_year["year"] == yr] + n_mdg = int(yr_ind[yr_ind["framework"] == "MDGs"]["indicator_id"].nunique()) + n_sdg = int(yr_ind[yr_ind["framework"] == "SDGs"]["indicator_id"].nunique()) + n_total_ind = int(yr_ind["indicator_id"].nunique()) + prev_score = score_by_year.get(yr - 1, None) + yoy_pct = ((yoy_val / prev_score * 100) if (yoy_val is not None and prev_score and prev_score != 0) else None) + + yr_country = country_total[country_total["year"] == yr].sort_values("rank_in_framework_year").reset_index(drop=True) + ranking_list = [] + for _, cr in yr_country.iterrows(): + cr_yoy = cr.get("year_over_year_change", None) + ranking_list.append({ + "rank" : int(cr["rank_in_framework_year"]), + "country_name": str(cr["country_name"]), + "score" : round(float(cr["framework_score_1_100"]), 2), + "yoy_change" : round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, + }) + + yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) + if not yr_country_yoy.empty: + best_idx = yr_country_yoy["year_over_year_change"].idxmax() + worst_idx = yr_country_yoy["year_over_year_change"].idxmin() + most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) + most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) + most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) + most_declined_delta = round(float(yr_country_yoy.loc[worst_idx, "year_over_year_change"]), 2) + else: + most_improved_country = most_declined_country = None + most_improved_delta = most_declined_delta = None + + narrative = _build_overview_narrative( + year=yr, n_mdg=n_mdg, n_sdg=n_sdg, n_total_ind=n_total_ind, + score=score, yoy_val=yoy_val, yoy_pct=yoy_pct, + prev_year=yr-1, prev_score=prev_score, ranking_list=ranking_list, + most_improved_country=most_improved_country, most_improved_delta=most_improved_delta, + most_declined_country=most_declined_country, most_declined_delta=most_declined_delta, + ) + + records.append({ + "year" : yr, + "n_mdg_indicators" : n_mdg, + "n_sdg_indicators" : n_sdg, + "n_total_indicators" : n_total_ind, + "asean_total_score" : round(score, 2), + "yoy_change" : yoy_val, + "yoy_change_pct" : round(yoy_pct, 2) if yoy_pct is not None else None, + "country_ranking_json" : json.dumps(ranking_list, ensure_ascii=False), + "most_improved_country": most_improved_country, + "most_improved_delta" : most_improved_delta, + "most_declined_country": most_declined_country, + "most_declined_delta" : most_declined_delta, + "narrative_overview" : narrative, + }) + + df = pd.DataFrame(records) + df["year"] = df["year"].astype(int) + df["n_mdg_indicators"] = df["n_mdg_indicators"].astype(int) + df["n_sdg_indicators"] = df["n_sdg_indicators"].astype(int) + df["n_total_indicators"] = df["n_total_indicators"].astype(int) + df["asean_total_score"] = df["asean_total_score"].astype(float) + for col in ["yoy_change", "yoy_change_pct", "most_improved_delta", "most_declined_delta"]: + df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_mdg_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_sdg_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_total_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("asean_total_score", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_change_pct", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("country_ranking_json", "STRING", mode="REQUIRED"), + bigquery.SchemaField("most_improved_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("most_improved_delta", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("most_declined_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + self._finalize(table_name, rows) + return df + + def calc_narrative_pillar(self, df_pillar_composite, df_pillar_by_country): + table_name = "agg_narrative_pillar" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 7: {table_name}") + self.logger.info("=" * 70) + + records = [] + for yr in sorted(df_pillar_composite["year"].unique()): + yr_pillars = df_pillar_composite[df_pillar_composite["year"] == yr].sort_values("rank_in_year").reset_index(drop=True) + yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] + strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None + weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None + + yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) + if not yr_pillars_yoy.empty: + best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() + worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() + most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) + most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) + most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) + most_declined_delta = round(float(yr_pillars_yoy.loc[worst_p_idx, "year_over_year_change"]), 2) + else: + most_improved_pillar = most_declined_pillar = None + most_improved_delta = most_declined_delta = None + + for _, prow in yr_pillars.iterrows(): + p_id = int(prow["pillar_id"]) + p_country = yr_country_pillar[yr_country_pillar["pillar_id"] == p_id].sort_values("rank_in_pillar_year").reset_index(drop=True) + top_country = bot_country = None + top_country_score = bot_country_score = None + if not p_country.empty: + top_country = str(p_country.iloc[0]["country_name"]) + top_country_score = round(float(p_country.iloc[0]["pillar_country_score_1_100"]), 2) + bot_country = str(p_country.iloc[-1]["country_name"]) + bot_country_score = round(float(p_country.iloc[-1]["pillar_country_score_1_100"]), 2) + + p_yoy = prow["year_over_year_change"] + narrative = _build_pillar_narrative( + year=yr, pillar_name=str(prow["pillar_name"]), + pillar_score=float(prow["pillar_score_1_100"]), + rank_in_year=int(prow["rank_in_year"]), n_pillars=len(yr_pillars), + yoy_val=float(p_yoy) if pd.notna(p_yoy) else None, + top_country=top_country, top_country_score=top_country_score, + bot_country=bot_country, bot_country_score=bot_country_score, + strongest_pillar=str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, + strongest_score=round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, + weakest_pillar=str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, + weakest_score=round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, + most_improved_pillar=most_improved_pillar, most_improved_delta=most_improved_delta, + most_declined_pillar=most_declined_pillar, most_declined_delta=most_declined_delta, + ) + records.append({ + "year" : yr, + "pillar_id" : p_id, + "pillar_name" : str(prow["pillar_name"]), + "pillar_score" : round(float(prow["pillar_score_1_100"]), 2), + "rank_in_year" : int(prow["rank_in_year"]), + "yoy_change" : float(p_yoy) if pd.notna(p_yoy) else None, + "top_country" : top_country, + "top_country_score" : top_country_score, + "bottom_country" : bot_country, + "bottom_country_score": bot_country_score, + "narrative_pillar" : narrative, + }) + + df = pd.DataFrame(records) + df["year"] = df["year"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["rank_in_year"] = df["rank_in_year"].astype(int) + for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: + df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_score", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("top_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("top_country_score", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("bottom_country", "STRING", mode="NULLABLE"), + bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), + ] + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # DIAGNOSTIK & VALIDASI + # ========================================================================= + + def _log_framework_score_diagnostics(self, df: pd.DataFrame, context: str): + """ + Log perbandingan rata-rata skor per framework. + Setelah perbaikan, gap antar framework mencerminkan perbedaan substantif, + bukan artefak normalisasi. + """ + self.logger.info(f"\n [DIAGNOSTIK] Rata-rata skor per framework ({context}):") + fw_means = df.groupby("framework")["framework_score_1_100"].agg(['mean', 'min', 'max']).round(2) + for fw, row in fw_means.iterrows(): + self.logger.info( + f" {fw:<8} mean={row['mean']:>6.2f} " + f"range=[{row['min']:.2f}, {row['max']:.2f}]" + ) + + if "MDGs" in fw_means.index and "SDGs" in fw_means.index: + gap = fw_means.loc["MDGs", "mean"] - fw_means.loc["SDGs", "mean"] + self.logger.info( + f"\n Gap MDGs-SDGs = {gap:.2f} poin" + + ( + " → SUBSTANTIF (indikator SDGs mengukur deprivasi lebih dalam)" + if abs(gap) > 10 else + " → dalam batas wajar" + ) + ) + + def _validate_mdgs_equals_total(self, df: pd.DataFrame, level: str = ""): + self.logger.info(f"\n Validasi MDGs < {self.sdgs_start_year} == Total [{level}]:") + group_by = ["year"] if level.startswith("asean") else ["country_id", "year"] + mdgs_pre = df[(df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) + total_pre = df[(df["framework"] == "Total") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "total_score"}) + if mdgs_pre.empty and total_pre.empty: + self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)") + return + if mdgs_pre.empty or total_pre.empty: + self.logger.warning(f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}") + return + check = mdgs_pre.merge(total_pre, on=group_by) + max_diff = (check["mdgs_score"] - check["total_score"]).abs().max() + status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}" + self.logger.info(f" -> {status} (n_checked={len(check)})") + + def _finalize(self, table_name: str, rows_loaded: int): + self.load_metadata[table_name].update({"rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now()}) + log_update(self.client, "DW", table_name, "full_load", rows_loaded) + self.logger.info(f" {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") + + def _fail(self, table_name: str, error: Exception): + self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) + self.logger.error(f" [FAIL] {table_name}: {error}") + log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) + + # ========================================================================= # RUN - # ------------------------------------------------------------------ + # ========================================================================= - def run(self, df: pd.DataFrame) -> int: - """ - Execute full cleaning pipeline -> load ke STAGING (Silver). - - Returns: - int: Rows loaded - """ - self.metadata['start_time'] = datetime.now() - self.metadata['rows_fetched'] = len(df) - - if df.empty: - print(" ERROR: DataFrame is empty, nothing to process.") - return 0 - - df = self._step_standardize_countries(df) - df = self._step_remove_missing(df) - df = self._step_remove_duplicates(df) - df = self._step_add_classifications(df) - df = self._step_apply_constraints(df) - - self.metadata['rows_transformed'] = len(df) - - validation = self.validate_data(df) - self.metadata['validation_metrics'] = validation - - all_within_limits = all( - info['within_limit'] - for info in validation.get('column_length_check', {}).values() + def run(self): + start = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold") + self.logger.info(f" Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") + self.logger.info( + " NORMALISASI: norm_value dari analytical_layer (satu referensi global)." + "\n Tidak ada rescaling per-framework. MDGs dan SDGs comparable." ) - if not all_within_limits: - print("\n WARNING: Some columns still exceed length constraints!") - for col, info in validation['column_length_check'].items(): - if not info['within_limit']: - print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}") + self.logger.info("=" * 70) - print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...") - rows_loaded = load_to_bigquery( - self.client, df, self.table_name, - layer='silver', - write_disposition="WRITE_TRUNCATE", - schema=self.SCHEMA - ) - self.metadata['rows_loaded'] = rows_loaded + self.load_data() + self._classify_indicators() - log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded) + df_pillar_composite = self.calc_pillar_composite() + df_pillar_by_country = self.calc_pillar_by_country() + df_framework_by_country = self.calc_framework_by_country() + df_framework_asean = self.calc_framework_asean() + self.calc_narrative_overview(df_framework_asean=df_framework_asean, df_framework_by_country=df_framework_by_country) + self.calc_narrative_pillar(df_pillar_composite=df_pillar_composite, df_pillar_by_country=df_pillar_by_country) - self.metadata['end_time'] = datetime.now() - self.metadata['duration_seconds'] = ( - self.metadata['end_time'] - self.metadata['start_time'] - ).total_seconds() - self.metadata['execution_timestamp'] = self.metadata['start_time'] - self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) - self.metadata['config_snapshot'] = json.dumps({'load_mode': self.load_mode}) - self.metadata['validation_metrics'] = json.dumps(validation) - save_etl_metadata(self.client, self.metadata) + duration = (datetime.now() - start).total_seconds() + total_rows = sum(m["rows_loaded"] for m in self.load_metadata.values()) - print(f"\n Cleaned Integration completed: {rows_loaded:,} rows") - print(f" Duration : {self.metadata['duration_seconds']:.2f}s") - print(f" Completeness : {validation['completeness_pct']:.2f}%") - if 'year_range' in validation: - yr = validation['year_range'] - if yr['min'] and yr['max']: - print(f" Year range : {yr['min']}-{yr['max']}") - print(f" Indicators : {validation.get('unique_indicators', '-')}") - print(f" Countries : {validation.get('unique_countries', '-')}") - print(f"\n Schema Validation:") - for col, info in validation.get('column_length_check', {}).items(): - status = "OK" if info['within_limit'] else "FAIL" - print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}") - print(f"\n Metadata -> [AUDIT] etl_metadata") - - return rows_loaded + self.logger.info("\n" + "=" * 70) + self.logger.info("SELESAI") + self.logger.info("=" * 70) + self.logger.info(f" Durasi : {duration:.2f}s") + self.logger.info(f" Total rows : {total_rows:,}") + for tbl, meta in self.load_metadata.items(): + icon = "OK" if meta["status"] == "success" else "FAIL" + self.logger.info(f" [{icon}] {tbl:<35} {meta['rows_loaded']:>10,}") # ============================================================================= -# AIRFLOW TASK FUNCTIONS +# AIRFLOW & MAIN # ============================================================================= -def run_cleaned_integration(): - """ - Airflow task: Load cleaned_integrated dari staging_integrated. - Dipanggil oleh DAG setelah task staging_integration_to_silver selesai. - """ +def run_aggregation(): from scripts.bigquery_config import get_bigquery_client - client = get_bigquery_client() - df_staging = load_staging_data(client) - loader = CleanedDataLoader(client, load_mode='full_refresh') - rows = loader.run(df_staging) - print(f"Cleaned layer loaded: {rows:,} rows") + client = get_bigquery_client() + agg = FoodSecurityAggregator(client) + agg.run() + total = sum(m["rows_loaded"] for m in agg.load_metadata.values()) + print(f"Aggregation completed: {total:,} total rows loaded") -# ============================================================================= -# MAIN EXECUTION -# ============================================================================= - if __name__ == "__main__": - print("=" * 60) - print("BIGQUERY CLEANED LAYER ETL") - print("Kimball DW Architecture") - print(" Input : STAGING (Silver) -> staging_integrated") - print(" Output : STAGING (Silver) -> cleaned_integrated") - print(" Audit : AUDIT -> etl_logs, etl_metadata") - print("=" * 60) + import io + if _sys.stdout.encoding and _sys.stdout.encoding.lower() not in ("utf-8", "utf8"): + _sys.stdout = io.TextIOWrapper(_sys.stdout.buffer, encoding="utf-8", errors="replace") + if _sys.stderr.encoding and _sys.stderr.encoding.lower() not in ("utf-8", "utf8"): + _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") - logger = setup_logging() - client = get_bigquery_client() - df_staging = load_staging_data(client) + print("=" * 70) + print("FOOD SECURITY AGGREGATION -> fs_asean_gold") + print(f"Condition threshold: bad<{THRESHOLD_BAD}, moderate {THRESHOLD_BAD}-{THRESHOLD_GOOD}, good>{THRESHOLD_GOOD}") + print("NORMALISASI: satu referensi global per indikator (dari analytical_layer).") + print("Tidak ada rescaling per-framework. MDGs dan SDGs comparable.") + print("=" * 70) - print("\n[1/1] Cleaned Integration -> STAGING (Silver)...") - loader = CleanedDataLoader(client, load_mode='full_refresh') - final_count = loader.run(df_staging) + logger = setup_logging() + for handler in logger.handlers: + handler.__class__ = _SafeStreamHandler - print("\n" + "=" * 60) - print("[OK] CLEANED LAYER ETL COMPLETED") - print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") - print(f" AUDIT : etl_logs, etl_metadata") - print("=" * 60) \ No newline at end of file + client = get_bigquery_client() + agg = FoodSecurityAggregator(client) + agg.run() + + print("\n" + "=" * 70) + print("[OK] SELESAI") + print("=" * 70) \ No newline at end of file