diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 4b64008..3b61593 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -8,7 +8,7 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Determine SDG start year & assign framework (MDGs/SDGs) per indicator PER ROW +6. Assign framework (MDGs/SDGs) per indicator PER ROW 7. Verify no gaps 8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) 9. Calculate YoY per indicator per country @@ -22,16 +22,17 @@ NORMALISASI (Step 8): sehingga nilai antar negara dan antar tahun tetap comparable - Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio -FRAMEWORK LOGIC (Row-Level Assignment): -- SDG start year dideteksi dari data: tahun pertama indikator FIES/anaemia lengkap - di semua fixed countries (setelah Step 3-5 filter selesai) -- Framework di-assign PER BARIS (per tahun): - * year < sdg_start_year → selalu 'MDGs' (semua indikator) - * year >= sdg_start_year + nama di SDG_ONLY_KEYWORDS → 'SDGs' - * selain itu (implisit) → 'MDGs' -- Hanya FIES dan anaemia yang masuk SDG_ONLY_KEYWORDS karena murni baru di era SDGs. -- Shared indicators (stunting, wasting, overweight, undernourishment) tidak terdaftar - di SDG_ONLY_KEYWORDS sehingga secara implisit selalu berlabel 'MDGs' di semua tahun. +FRAMEWORK LOGIC (FIX - Per Indicator, Per Row): +- Framework di-assign PER BARIS dengan mempertimbangkan actual_start_year MASING-MASING + indikator, bukan satu sdg_start_year global. +- Logika: + * Jika nama indikator TIDAK ada di SDG_ONLY_KEYWORDS → selalu 'MDGs' (semua tahun) + * Jika nama indikator ADA di SDG_ONLY_KEYWORDS: + - row['year'] >= actual_start_year[indicator] → 'SDGs' + - row['year'] < actual_start_year[indicator] → 'MDGs' +- Baris dengan year < actual_start_year TETAP ADA di data (tidak dihapus di Step 5), + hanya mendapat label 'MDGs'. +- actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter """ import pandas as pd @@ -61,8 +62,8 @@ from google.cloud import bigquery # SDG-ONLY INDICATOR KEYWORDS # ============================================================================= # Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini. -# Baris dengan year >= sdg_start_year + nama ada di set ini → 'SDGs'. -# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' secara implisit. +# Indikator di set ini → 'SDGs' mulai dari actual_start_year indikator tersebut. +# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun. SDG_ONLY_KEYWORDS = frozenset([ # TARGET 2.1.2 — FIES (SDGs only) @@ -83,19 +84,9 @@ SDG_ONLY_KEYWORDS = frozenset([ "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) -# Proxy keywords untuk deteksi era SDGs dari data (indikator murni baru di SDGs) -_SDG_ERA_PROXY_KEYWORDS = frozenset([ - "food insecurity", - "anemia", - "anaemia", -]) - # ============================================================================= # THRESHOLD KONDISI (fixed absolute, skala 1-100) # ============================================================================= -# bad : norm_value_1_100 < THRESHOLD_BAD -# good : norm_value_1_100 > THRESHOLD_GOOD -# moderate : di antara keduanya THRESHOLD_BAD = 40.0 THRESHOLD_GOOD = 60.0 @@ -104,8 +95,6 @@ THRESHOLD_GOOD = 60.0 def assign_condition(norm_value_1_100: float) -> str: """ Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). - Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert). - Returns: 'good' / 'moderate' / 'bad' """ if pd.isna(norm_value_1_100): @@ -117,38 +106,6 @@ def assign_condition(norm_value_1_100: float) -> str: return 'moderate' -def assign_framework_for_row( - indicator_name: str, - row_year: int, - sdg_start_year: int, -) -> str: - """ - Tentukan framework (MDGs/SDGs) PER BARIS (per tahun). - - Logic: - ───────────────────────────────────────────────────────────────────────── - RULE 1: row_year < sdg_start_year - → selalu 'MDGs', tanpa kecuali. - - RULE 2: row_year >= sdg_start_year AND nama ada di SDG_ONLY_KEYWORDS - → 'SDGs' - - RULE 3 (implisit): semua kondisi lain - → 'MDGs' - Ini mencakup shared indicators (stunting, wasting, overweight, - undernourishment) yang tidak terdaftar di SDG_ONLY_KEYWORDS, - sehingga tidak perlu di-list secara eksplisit. - ───────────────────────────────────────────────────────────────────────── - """ - if row_year < sdg_start_year: - return 'MDGs' - - if str(indicator_name).lower().strip() in SDG_ONLY_KEYWORDS: - return 'SDGs' - - return 'MDGs' - - # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -162,13 +119,16 @@ class AnalyticalLayerLoader: indicator_id, indicator_name, direction, framework, pillar_id, pillar_name, time_id, year, value, - norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware + norm_value_1_100, yoy_change, yoy_pct - FRAMEWORK LOGIC: - - year < sdg_start_year → 'MDGs' (semua indikator) - - year >= sdg_start_year + nama di SDG_ONLY_KEYWORDS → 'SDGs' (FIES + anaemia) - - selain itu (implisit) → 'MDGs' + FRAMEWORK LOGIC (FIX): + - Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun + - Indikator DI SDG_ONLY_KEYWORDS: + year >= actual_start_year[indikator] → 'SDGs' + year < actual_start_year[indikator] → 'MDGs' + - actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter + - Baris year < actual_start_year TETAP ADA, hanya berlabel 'MDGs' """ def __init__(self, client: bigquery.Client): @@ -181,13 +141,14 @@ class AnalyticalLayerLoader: self.df_country = None self.df_pillar = None - self.selected_country_ids = None + self.selected_country_ids = None + self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5) self.start_year = 2013 self.end_year = None - self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap) + self.baseline_year = 2023 - self.sdg_start_year = None + self.sdg_start_year = None # disimpan untuk metadata/logging saja self.pipeline_metadata = { 'source_class' : self.__class__.__name__, @@ -490,19 +451,22 @@ class AnalyticalLayerLoader: if not valid_indicators: raise ValueError("No valid indicators found after filtering!") + # ---------------------------------------------------------------- + # Filter hanya indikator yang valid + # TIDAK menghapus baris year < max_start_year — + # semua baris tetap ada, label framework ditentukan di Step 6 + # ---------------------------------------------------------------- original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() - self.df_clean = self.df_clean.merge( - indicator_max_start[['indicator_id', 'max_start_year']], - on='indicator_id', how='left' + # Simpan max_start_year sebagai lookup untuk Step 6 + self.indicator_max_start_map = ( + indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] + .set_index('indicator_id')['max_start_year'] + .to_dict() ) - self.df_clean = self.df_clean[ - self.df_clean['year'] >= self.df_clean['max_start_year'] - ].copy() - self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") @@ -512,74 +476,123 @@ class AnalyticalLayerLoader: return self.df_clean # ------------------------------------------------------------------ - # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL) + # STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year) # ------------------------------------------------------------------ def determine_sdg_start_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL)") + self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year)") self.logger.info("=" * 80) - indicator_actual_start = ( - self.df_clean - .groupby(['indicator_id', 'indicator_name', 'country_id'])['year'] - .min().reset_index() - .groupby(['indicator_id', 'indicator_name'])['year'] - .max().reset_index() - ) - indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] + # ---------------------------------------------------------------- + # Hitung actual_start_year PER INDIKATOR dari indicator_max_start_map + # yang sudah dihitung di Step 5. + # actual_start_year = max(min_year per country) per indikator + # = tahun di mana semua fixed countries sudah punya data + # ---------------------------------------------------------------- + indicator_actual_start = pd.DataFrame([ + {'indicator_id': ind_id, 'actual_start_year': start_yr} + for ind_id, start_yr in self.indicator_max_start_map.items() + ]) - # Deteksi sdg_start_year dari proxy SDGs-only (FIES & anaemia) - proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply( - lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS) + # Merge indicator_name untuk keperluan logging + indicator_actual_start = indicator_actual_start.merge( + self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(), + on='indicator_id', how='left' ) - df_proxy = indicator_actual_start[proxy_mask] - if df_proxy.empty: + # Tandai mana yang SDG-only + indicator_actual_start['is_sdg_only'] = ( + indicator_actual_start['indicator_name'] + .str.lower().str.strip() + .isin(SDG_ONLY_KEYWORDS) + ) + + # sdg_start_year global = min(actual_start_year dari SDG-only indicators) + # Disimpan hanya untuk metadata/logging + sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']] + if sdg_only_df.empty: raise ValueError( - "Tidak ada indikator proxy SDGs (FIES/anaemia) yang lolos filter. " + "Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. " "Pastikan indikator FIES dan anaemia ada di data." ) + self.sdg_start_year = int(sdg_only_df['actual_start_year'].min()) - self.sdg_start_year = int(df_proxy['actual_start_year'].min()) - self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}") - self.logger.info(f" Proxy indicators (penentu sdg_start_year):") - for _, row in df_proxy.iterrows(): - self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") - - self.logger.info(f"\n Assigning framework PER ROW...") - self.logger.info(f" year < {self.sdg_start_year} → MDGs (semua indikator)") - self.logger.info(f" year >= {self.sdg_start_year} + nama in SDG_ONLY_KEYWORDS → SDGs") - self.logger.info(f" selain itu (implisit) → MDGs") - - self.df_clean['framework'] = self.df_clean.apply( - lambda row: assign_framework_for_row( - indicator_name = row['indicator_name'], - row_year = int(row['year']), - sdg_start_year = self.sdg_start_year, - ), - axis=1 - ) - - # Log ringkasan per indikator untuk verifikasi - self.logger.info(f"\n {'Framework Assignment per Indicator':}") - self.logger.info(f" {'-'*100}") + self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:") + self.logger.info(f" {'-'*80}") + for _, row in indicator_actual_start[indicator_actual_start['is_sdg_only']].iterrows(): + self.logger.info( + f" [SDG-only] start={int(row['actual_start_year'])} | {row['indicator_name']}" + ) self.logger.info( - f" {'ID':<5} {'Indicator Name':<52} " - f"{'Pre-SDG':<10} {'MDGs':<10} {'SDGs':<10} {'SDG-Only?'}" + f"\n sdg_start_year (earliest SDG-only, for metadata): {self.sdg_start_year}" ) - self.logger.info(f" {'-'*100}") + + # Lookup: indicator_id → actual_start_year (hanya SDG-only, untuk logging) + sdg_only_start_map = ( + indicator_actual_start[indicator_actual_start['is_sdg_only']] + .set_index('indicator_id')['actual_start_year'] + .to_dict() + ) + + self.logger.info(f"\n Logika assign framework (PER BARIS, PER INDIKATOR):") + self.logger.info(f" ─────────────────────────────────────────────────────") + self.logger.info(f" Jika indikator TIDAK di SDG_ONLY_KEYWORDS:") + self.logger.info(f" → 'MDGs' di semua tahun (shared indicators)") + self.logger.info(f" Jika indikator DI SDG_ONLY_KEYWORDS:") + self.logger.info(f" year >= actual_start_year[indikator] → 'SDGs'") + self.logger.info(f" year < actual_start_year[indikator] → 'MDGs'") + self.logger.info(f" ─────────────────────────────────────────────────────") + + # ---------------------------------------------------------------- + # Assign framework dengan vectorized merge + # ---------------------------------------------------------------- + self.df_clean = self.df_clean.merge( + indicator_actual_start[['indicator_id', 'is_sdg_only', 'actual_start_year']], + on='indicator_id', + how='left' + ) + + # Assign framework: + # - Jika bukan SDG-only → 'MDGs' + # - Jika SDG-only AND year >= actual_start_year → 'SDGs' + # - Jika SDG-only AND year < actual_start_year → 'MDGs' + self.df_clean['framework'] = np.where( + self.df_clean['is_sdg_only'] & (self.df_clean['year'] >= self.df_clean['actual_start_year']), + 'SDGs', + 'MDGs' + ) + + # Drop kolom bantu + self.df_clean = self.df_clean.drop(columns=['is_sdg_only', 'actual_start_year']) + + # ---------------------------------------------------------------- + # Log verifikasi per indikator + # ---------------------------------------------------------------- + self.logger.info(f"\n Verifikasi framework per indikator:") + self.logger.info(f" {'-'*105}") + self.logger.info( + f" {'ID':<5} {'Indicator Name':<52} {'Start':<8} " + f"{'MDGs rows':<12} {'SDGs rows':<12} {'Expected'}" + ) + self.logger.info(f" {'-'*105}") for ind_id, grp in self.df_clean.groupby('indicator_id'): ind_name = grp['indicator_name'].iloc[0] - pre_sdg = (grp['year'] < self.sdg_start_year).sum() mdgs_rows = (grp['framework'] == 'MDGs').sum() sdgs_rows = (grp['framework'] == 'SDGs').sum() is_sdg_only = ind_name.lower().strip() in SDG_ONLY_KEYWORDS + start_yr = int(grp['year'].min()) + + if is_sdg_only: + ind_start = sdg_only_start_map.get(ind_id, '?') + expected = f"SDGs from {ind_start}, MDGs before" + else: + expected = "MDGs always" + self.logger.info( - f" {int(ind_id):<5} {ind_name[:50]:<52} " - f"{pre_sdg:<10} {mdgs_rows:<10} {sdgs_rows:<10} " - f"{'YES' if is_sdg_only else 'no'}" + f" {int(ind_id):<5} {ind_name[:50]:<52} {start_yr:<8} " + f"{mdgs_rows:<12} {sdgs_rows:<12} {expected}" ) fw_summary = self.df_clean['framework'].value_counts() @@ -609,23 +622,41 @@ class AnalyticalLayerLoader: self.logger.info("STEP 7: VERIFY NO GAPS") self.logger.info("=" * 80) + # ---------------------------------------------------------------- + # Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya, + # bukan dari self.start_year global, karena tiap indikator bisa + # punya start year berbeda. + # ---------------------------------------------------------------- expected_countries = len(self.selected_country_ids) - verification = self.df_clean.groupby( - ['indicator_id', 'year'] - )['country_id'].nunique().reset_index() - verification.columns = ['indicator_id', 'year', 'country_count'] - all_good = (verification['country_count'] == expected_countries).all() + all_good = True + bad_rows = [] + + for ind_id, grp in self.df_clean.groupby('indicator_id'): + actual_start = self.indicator_max_start_map.get(ind_id) + if actual_start is None: + continue + + expected_years = list(range(int(actual_start), self.end_year + 1)) + + for year in expected_years: + country_count = grp[grp['year'] == year]['country_id'].nunique() + if country_count != expected_countries: + all_good = False + bad_rows.append({ + 'indicator_id' : int(ind_id), + 'year' : int(year), + 'country_count': int(country_count), + }) if all_good: self.logger.info( f" VERIFICATION PASSED — all combinations have {expected_countries} countries" ) else: - bad = verification[verification['country_count'] != expected_countries] - for _, row in bad.head(10).iterrows(): + for row in bad_rows[:10]: self.logger.error( - f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " - f"{int(row['country_count'])} countries (expected {expected_countries})" + f" Indicator {row['indicator_id']}, Year {row['year']}: " + f"{row['country_count']} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") @@ -638,13 +669,7 @@ class AnalyticalLayerLoader: def calculate_norm_value(self): """ Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, - direction-aware. - - CARA KERJA: - - Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus) - sehingga nilai antar negara dan antar tahun tetap comparable. - - lower_better diinvert: nilai tinggi selalu = kondisi lebih baik. - - Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio. + direction-aware, global per indikator (semua negara + semua tahun). """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") @@ -936,9 +961,9 @@ class AnalyticalLayerLoader: 'fixed_countries' : len(self.selected_country_ids), 'norm_scale' : '1-100 per indicator global minmax direction-aware', 'framework_logic' : ( - 'row-level: year < sdg_start_year → MDGs always; ' - 'year >= sdg_start_year + SDG_ONLY_KEYWORDS → SDGs; ' - 'else (implicit) → MDGs' + 'per-indicator actual_start_year: ' + 'SDG-only indicator → SDGs from its own actual_start_year, MDGs before; ' + 'shared/other indicators → MDGs always' ), 'sdg_only_keywords_count' : len(SDG_ONLY_KEYWORDS), 'condition_thresholds' : { @@ -975,7 +1000,7 @@ class AnalyticalLayerLoader: self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info("Framework: year < sdg_start_year → MDGs | SDG_ONLY → SDGs | else → MDGs (implicit)") + self.logger.info("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)") self.logger.info("=" * 80) self.load_source_data() @@ -1026,7 +1051,7 @@ if __name__ == "__main__": print("Output: fact_asean_food_security_selected -> fs_asean_gold") print(f"Norm: min-max 1-100 per indicator, direction-aware") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - print(f"Framework: year < sdg_start_year → MDGs | SDG_ONLY → SDGs | else → MDGs (implicit)") + print("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)") print("=" * 80) logger = setup_logging()