diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index bf1381e..9cecb19 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -8,32 +8,27 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year -7. Verify no gaps -8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) + → TIDAK menghapus baris year < max_start_year + → Semua baris tetap ada; label framework ditentukan di Step 6 +6. Assign framework (MDGs/SDGs) per indicator PER ROW + → Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu + → Indikator DI SDG_ONLY_KEYWORDS + year >= sdg_transition_year → 'SDGs' + → Indikator DI SDG_ONLY_KEYWORDS + year < sdg_transition_year → 'MDGs' + → sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators + yang lolos filter (= tahun pertama data SDG-only konsisten di semua countries) +7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global) +8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global) 9. Calculate YoY per indicator per country 10. Analyze indicator availability by year 11. Save analytical table -NORMALISASI (Step 8): -- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100 -- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik -- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus) - sehingga nilai antar negara dan antar tahun tetap comparable -- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio - -FRAMEWORK LOGIC (Per-Row, bukan per indikator): -- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap - di semua fixed countries (setelah Step 3-5 filter selesai) -- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure") - Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs -- Framework di-assign PER BARIS (per year), bukan per indikator: - * row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs' - * Selain itu -> 'MDGs' -- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment) - yang datanya ada sebelum SDGs: - * row lama (year < sdg_start_year) -> 'MDGs' - * row baru (year >= sdg_start_year) -> 'SDGs' +FRAMEWORK LOGIC: +- sdg_transition_year dihitung SATU KALI dari actual_start_year SDG-only indicators +- Semua SDG-only indicators menggunakan sdg_transition_year yang SAMA + sehingga label berubah serentak di satu titik waktu +- Baris sebelum sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) +- Baris mulai sdg_transition_year → 'SDGs' +- Indikator non-SDG-only → 'MDGs' selalu """ import pandas as pd @@ -60,17 +55,14 @@ from google.cloud import bigquery # ============================================================================= -# SDG INDICATOR KEYWORDS -# Daftar nama indikator (lowercase) yang masuk SDG framework. -# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year, -# dan 'MDGs' untuk baris dengan year < sdg_start_year. +# SDG-ONLY INDICATOR KEYWORDS # ============================================================================= +# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini. +# Indikator di set ini → 'SDGs' mulai dari sdg_transition_year. +# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun. -SDG_INDICATOR_KEYWORDS = frozenset([ - # TARGET 2.1.1 — Prevalence of undernourishment (shared: ada sebelum SDGs) - "prevalence of undernourishment (percent) (3-year average)", - "number of people undernourished (million) (3-year average)", - # TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs) +SDG_ONLY_KEYWORDS = frozenset([ + # TARGET 2.1.2 — FIES (SDGs only) "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", @@ -83,45 +75,14 @@ SDG_INDICATOR_KEYWORDS = frozenset([ "number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)", - # TARGET 2.2.1 — Stunting (shared: ada sebelum SDGs) - "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", - "number of children under 5 years of age who are stunted (modeled estimates) (million)", - # TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs) - "percentage of children under 5 years affected by wasting (percent)", - "number of children under 5 years affected by wasting (million)", - "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", - "number of children under 5 years of age who are overweight (modeled estimates) (million)", - # TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar - # baris >= sdg_start_year di-assign 'SDGs') + # TARGET 2.2.3 — Anaemia (SDGs only) "prevalence of anemia among women of reproductive age (15-49 years) (percent)", "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) -# ============================================================================= -# SDG ERA PROXY KEYWORDS -# HANYA indikator yang MURNI baru di era SDGs (FIES saja). -# Dipakai untuk mendeteksi sdg_start_year dari data. -# -# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy: -# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya -# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year -# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'. -# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs -# dan dapat dipakai sebagai penanda tahun mulainya era SDGs. -# ============================================================================= -_SDG_ERA_PROXY_KEYWORDS = frozenset([ - "food insecurity", - "food insecure", -]) - # ============================================================================= # THRESHOLD KONDISI (fixed absolute, skala 1-100) # ============================================================================= -# Digunakan untuk assign kondisi di analysis_layer. -# Didefinisikan di sini agar konsisten antara kedua file. -# bad : norm_value_1_100 < THRESHOLD_BAD -# good : norm_value_1_100 > THRESHOLD_GOOD -# moderate : di antara keduanya THRESHOLD_BAD = 40.0 THRESHOLD_GOOD = 60.0 @@ -130,8 +91,6 @@ THRESHOLD_GOOD = 60.0 def assign_condition(norm_value_1_100: float) -> str: """ Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). - Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert). - Returns: 'good' / 'moderate' / 'bad' """ if pd.isna(norm_value_1_100): @@ -143,44 +102,6 @@ def assign_condition(norm_value_1_100: float) -> str: return 'moderate' -def assign_framework_per_row( - indicator_name: str, - year: int, - sdg_start_year: int, -) -> str: - """ - Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator. - - Logic: - - 'SDGs' jika KEDUA kondisi terpenuhi: - 1. Nama indikator ada di SDG_INDICATOR_KEYWORDS - 2. year (tahun baris ini) >= sdg_start_year - - 'MDGs' untuk semua kasus lain. - - Mengapa per row, bukan per indikator? - Indikator "shared" seperti anemia, stunting, wasting, undernourishment - memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan - per indikator menggunakan actual_start_year, indikator-indikator ini - akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year. - Dengan assign per row menggunakan year baris: - - baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs) - - baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs) - - Contoh anemia (sdg_start_year = 2016): - - row year=2013 -> 'MDGs' - - row year=2014 -> 'MDGs' - - row year=2015 -> 'MDGs' - - row year=2016 -> 'SDGs' - - row year=2017 -> 'SDGs' - - ... - """ - name_lower = str(indicator_name).lower().strip() - in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS - if in_sdg_list and int(year) >= sdg_start_year: - return 'SDGs' - return 'MDGs' - - # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -194,14 +115,17 @@ class AnalyticalLayerLoader: indicator_id, indicator_name, direction, framework, pillar_id, pillar_name, time_id, year, value, - norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware + norm_value_1_100, yoy_change, yoy_pct - Catatan framework: - Framework di-assign PER BARIS (per year), sehingga indikator shared - seperti anemia dapat memiliki framework berbeda di baris yang berbeda: - - baris sebelum sdg_start_year -> 'MDGs' - - baris sejak sdg_start_year -> 'SDGs' + FRAMEWORK LOGIC: + - Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun + - Indikator DI SDG_ONLY_KEYWORDS: + year < sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) + year >= sdg_transition_year → 'SDGs' + - sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators + yang lolos filter Step 3-5. Semua SDG-only indicators menggunakan + sdg_transition_year yang SAMA agar label berubah serentak. """ def __init__(self, client: bigquery.Client): @@ -214,13 +138,13 @@ class AnalyticalLayerLoader: self.df_country = None self.df_pillar = None - self.selected_country_ids = None + self.selected_country_ids = None + self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5) + self.sdg_transition_year = None # tahun SDGs mulai berlaku (dari Step 6) self.start_year = 2013 self.end_year = None - self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap) - - self.sdg_start_year = None + self.baseline_year = 2023 self.pipeline_metadata = { 'source_class' : self.__class__.__name__, @@ -306,15 +230,6 @@ class AnalyticalLayerLoader: self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) - # Filter single years only (is_year_range == False) - if 'is_year_range' in self.df_clean.columns: - before = len(self.df_clean) - self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy() - self.logger.info( - f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows" - ) - - # baseline_year = 2023 hardcode (syarat dosen: minimal 2023) df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_baseline['indicator_id'].nunique() @@ -479,6 +394,8 @@ class AnalyticalLayerLoader: self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) + # Hitung max_start_year per indikator = max(min_year per country) + # = tahun pertama di mana SEMUA fixed countries sudah punya data indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() @@ -507,6 +424,8 @@ class AnalyticalLayerLoader: }) continue + # Cek apakah semua tahun dari max_start s/d end_year + # hadir di SEMUA fixed countries expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True @@ -529,140 +448,173 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") - if removed_indicators: - self.logger.info(f"\n Removed indicators:") - for item in removed_indicators: - self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}") - if not valid_indicators: raise ValueError("No valid indicators found after filtering!") + # ---------------------------------------------------------------- + # Filter hanya indikator yang valid. + # PENTING: TIDAK menghapus baris year < max_start_year. + # Semua baris tetap ada — label framework ditentukan di Step 6. + # max_start_year disimpan sebagai lookup untuk Step 6 & 7. + # ---------------------------------------------------------------- original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() - self.df_clean = self.df_clean.merge( - indicator_max_start[['indicator_id', 'max_start_year']], - on='indicator_id', how='left' + # Simpan max_start_year per indicator_id untuk Step 6 dan Step 7 + self.indicator_max_start_map = ( + indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] + .set_index('indicator_id')['max_start_year'] + .to_dict() ) - self.df_clean = self.df_clean[ - self.df_clean['year'] >= self.df_clean['max_start_year'] - ].copy() - self.df_clean = self.df_clean.drop('max_start_year', axis=1) - self.logger.info(f"\n Rows before: {original_count:,}") - self.logger.info(f" Rows after: {len(self.df_clean):,}") - self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") - self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") + self.logger.info(f"\n Rows before : {original_count:,}") + self.logger.info(f" Rows after : {len(self.df_clean):,}") + self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}") + self.logger.info( + f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. " + f"Label framework akan ditentukan di Step 6." + ) return self.df_clean # ------------------------------------------------------------------ - # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW + # STEP 6: ASSIGN FRAMEWORK PER ROW # ------------------------------------------------------------------ def determine_sdg_start_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW") + self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW") self.logger.info("=" * 80) - self.logger.info( - " Proxy: FIES only (food insecurity/food insecure).\n" - " Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n" - " Framework di-assign PER BARIS (year), bukan per indikator." - ) - - # actual_start_year per indikator = max(min_year per country) - # = konsisten dengan max_start_year di Step 5 - indicator_actual_start = ( - self.df_clean - .groupby(['indicator_id', 'indicator_name', 'country_id'])['year'] - .min().reset_index() - .groupby(['indicator_id', 'indicator_name'])['year'] - .max().reset_index() - ) - indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] - - # Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia) - proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply( - lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS) - ) - df_proxy = indicator_actual_start[proxy_mask] - - if df_proxy.empty: - raise ValueError( - "Tidak ada indikator proxy SDGs (FIES) yang lolos filter. " - "Pastikan indikator FIES (food insecurity/food insecure) ada di data." - ) - - self.sdg_start_year = int(df_proxy['actual_start_year'].min()) - self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}") - self.logger.info(f" Proxy indicators (FIES only):") - for _, row in df_proxy.iterrows(): - self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") # ---------------------------------------------------------------- - # Assign framework PER BARIS menggunakan year baris, bukan actual_start_year - # Sehingga indikator "shared" (anemia, stunting, dll) mendapat: - # - 'MDGs' untuk baris sebelum sdg_start_year - # - 'SDGs' untuk baris sejak sdg_start_year + # Bangun tabel actual_start_year per indikator dari + # indicator_max_start_map yang sudah ditetapkan di Step 5. # ---------------------------------------------------------------- - self.df_clean['framework'] = self.df_clean.apply( - lambda row: assign_framework_per_row( - indicator_name = row['indicator_name'], - year = int(row['year']), - sdg_start_year = self.sdg_start_year, - ), - axis=1 - ) + indicator_actual_start = pd.DataFrame([ + {'indicator_id': ind_id, 'actual_start_year': int(start_yr)} + for ind_id, start_yr in self.indicator_max_start_map.items() + ]) - # ---------------------------------------------------------------- - # Logging: ringkasan per indikator (frameworks apa yang muncul) - # ---------------------------------------------------------------- - ind_fw_summary = ( - self.df_clean - .groupby(['indicator_id', 'indicator_name'])['framework'] - .unique() - .reset_index() - ) - ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply( - lambda x: '/'.join(sorted(x)) - ) - ind_fw_summary = ind_fw_summary.merge( - indicator_actual_start[['indicator_id', 'actual_start_year']], + # Merge indicator_name untuk logging + indicator_actual_start = indicator_actual_start.merge( + self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(), on='indicator_id', how='left' ) - self.logger.info(f"\n Framework assignment per indikator:") - self.logger.info(f" {'-'*85}") - self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}") - self.logger.info(f" {'-'*85}") - for _, row in ind_fw_summary.sort_values( - ['frameworks', 'actual_start_year', 'indicator_name'] - ).iterrows(): - self.logger.info( - f" {int(row['indicator_id']):<5} {row['frameworks']:<18} " - f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}" - ) - - # Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi - split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs'] - if not split_inds.empty: - self.logger.info( - f"\n [INFO] {len(split_inds)} indikator memiliki framework split " - f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):" - ) - for _, row in split_inds.iterrows(): - self.logger.info(f" - {row['indicator_name'][:60]}") - - fw_summary = self.df_clean['framework'].value_counts() - self.logger.info( - f"\n Ringkasan rows: " + - " | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()) + # Tandai mana yang SDG-only + indicator_actual_start['is_sdg_only'] = ( + indicator_actual_start['indicator_name'] + .str.lower().str.strip() + .isin(SDG_ONLY_KEYWORDS) ) + # ---------------------------------------------------------------- + # sdg_transition_year = min(actual_start_year) dari semua SDG-only + # indicators yang lolos filter. + # Ini adalah satu titik waktu di mana semua SDG-only indicators + # berubah dari 'MDGs' ke 'SDGs' secara SERENTAK. + # ---------------------------------------------------------------- + sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']] + if sdg_only_df.empty: + raise ValueError( + "Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. " + "Pastikan indikator FIES dan anaemia ada di data." + ) + + self.sdg_transition_year = int(sdg_only_df['actual_start_year'].min()) + + self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:") + self.logger.info(f" {'-'*80}") + for _, row in sdg_only_df.iterrows(): + self.logger.info( + f" [SDG-only] actual_start={int(row['actual_start_year'])} | " + f"{row['indicator_name']}" + ) + self.logger.info( - f"\n [OK] 'framework' ditambahkan per row — " + f"\n sdg_transition_year = {self.sdg_transition_year} " + f"(min actual_start_year dari semua SDG-only indicators)" + ) + + self.logger.info(f"\n Logika assign framework (PER BARIS):") + self.logger.info(f" ──────────────────────────────────────────────────────────") + self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS:") + self.logger.info(f" → 'MDGs' di semua tahun") + self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:") + self.logger.info(f" year < {self.sdg_transition_year} → 'MDGs' (data tetap ada)") + self.logger.info(f" year >= {self.sdg_transition_year} → 'SDGs'") + self.logger.info(f" ──────────────────────────────────────────────────────────") + + # ---------------------------------------------------------------- + # Assign framework dengan vectorized operation menggunakan + # sdg_transition_year (SATU nilai untuk semua SDG-only indicators) + # ---------------------------------------------------------------- + # Tandai apakah setiap baris adalah SDG-only indicator + sdg_only_ids = set( + indicator_actual_start.loc[ + indicator_actual_start['is_sdg_only'], 'indicator_id' + ] + ) + self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids) + + # Assign framework: + # - Bukan SDG-only → 'MDGs' + # - SDG-only AND year >= sdg_transition_year → 'SDGs' + # - SDG-only AND year < sdg_transition_year → 'MDGs' + self.df_clean['framework'] = np.where( + self.df_clean['_is_sdg_only'] & + (self.df_clean['year'] >= self.sdg_transition_year), + 'SDGs', + 'MDGs' + ) + + # Drop kolom bantu + self.df_clean = self.df_clean.drop(columns=['_is_sdg_only']) + + # ---------------------------------------------------------------- + # Log verifikasi per indikator + # ---------------------------------------------------------------- + self.logger.info(f"\n Verifikasi framework per indikator:") + self.logger.info(f" {'-'*110}") + self.logger.info( + f" {'ID':<5} {'Indicator Name':<52} {'Data From':<12} " + f"{'MDGs rows':<12} {'SDGs rows':<12} {'Note'}" + ) + self.logger.info(f" {'-'*110}") + + for ind_id, grp in self.df_clean.groupby('indicator_id'): + ind_name = grp['indicator_name'].iloc[0] + mdgs_rows = (grp['framework'] == 'MDGs').sum() + sdgs_rows = (grp['framework'] == 'SDGs').sum() + is_sdg_only = ind_id in sdg_only_ids + data_from = int(grp['year'].min()) + + if is_sdg_only: + note = f"SDGs from {self.sdg_transition_year}, MDGs before" + else: + note = "MDGs always" + + self.logger.info( + f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<12} " + f"{mdgs_rows:<12} {sdgs_rows:<12} {note}" + ) + + fw_summary = self.df_clean['framework'].value_counts() + self.logger.info(f"\n Ringkasan rows: " + " | ".join( + f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items() + )) + + end_year_df = self.df_clean[self.df_clean['year'] == self.end_year] + fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique() + self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join( + f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items() + )) + + self.logger.info( + f"\n [OK] 'framework' ditambahkan — " f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" ) @@ -677,23 +629,44 @@ class AnalyticalLayerLoader: self.logger.info("STEP 7: VERIFY NO GAPS") self.logger.info("=" * 80) + # ---------------------------------------------------------------- + # Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya, + # bukan dari self.start_year global, karena tiap indikator bisa + # punya start year berbeda. + # Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek + # karena memang tidak semua country punya data di sana. + # ---------------------------------------------------------------- expected_countries = len(self.selected_country_ids) - verification = self.df_clean.groupby( - ['indicator_id', 'year'] - )['country_id'].nunique().reset_index() - verification.columns = ['indicator_id', 'year', 'country_count'] - all_good = (verification['country_count'] == expected_countries).all() + all_good = True + bad_rows = [] + + for ind_id, grp in self.df_clean.groupby('indicator_id'): + actual_start = self.indicator_max_start_map.get(ind_id) + if actual_start is None: + continue + + expected_years = list(range(int(actual_start), self.end_year + 1)) + + for year in expected_years: + country_count = grp[grp['year'] == year]['country_id'].nunique() + if country_count != expected_countries: + all_good = False + bad_rows.append({ + 'indicator_id' : int(ind_id), + 'year' : int(year), + 'country_count': int(country_count), + }) if all_good: self.logger.info( - f" VERIFICATION PASSED — all combinations have {expected_countries} countries" + f" VERIFICATION PASSED — all combinations from actual_start_year " + f"have {expected_countries} countries" ) else: - bad = verification[verification['country_count'] != expected_countries] - for _, row in bad.head(10).iterrows(): + for row in bad_rows[:10]: self.logger.error( - f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " - f"{int(row['country_count'])} countries (expected {expected_countries})" + f" Indicator {row['indicator_id']}, Year {row['year']}: " + f"{row['country_count']} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") @@ -706,22 +679,7 @@ class AnalyticalLayerLoader: def calculate_norm_value(self): """ Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, - direction-aware. - - CARA KERJA: - - Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus) - sehingga nilai antar negara dan antar tahun tetap comparable. - - lower_better diinvert: nilai tinggi selalu = kondisi lebih baik. - Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert. - - Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio. - - Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan - (persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama. - - Catatan: - - Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer - yang skala 0-1 dan dipakai untuk agregasi composite score. - - norm_value_1_100 ini adalah per baris (per country per year per indicator), - untuk ditampilkan langsung di Looker Studio. + direction-aware, global per indikator (semua negara + semua tahun). """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") @@ -735,7 +693,10 @@ class AnalyticalLayerLoader: norm_parts = [] indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) - self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}") + self.logger.info( + f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} " + f"{'Min':>10} {'Max':>10} {'Indicator Name'}" + ) self.logger.info(f" {'-'*90}") for (ind_id, ind_name, direction), grp in indicators: @@ -749,21 +710,17 @@ class AnalyticalLayerLoader: norm_parts.append(grp) continue - raw = grp.loc[valid_mask, 'value'].values - v_min = raw.min() - v_max = raw.max() - normed = np.full(len(grp), np.nan) + raw = grp.loc[valid_mask, 'value'].values + v_min = raw.min() + v_max = raw.max() + normed = np.full(len(grp), np.nan) if v_min == v_max: - # Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100) normed[valid_mask.values] = 50.5 else: - # Min-max ke 0-1 dulu scaled = (raw - v_min) / (v_max - v_min) - # Invert jika lower_better if do_invert: scaled = 1.0 - scaled - # Scale ke 1-100 normed[valid_mask.values] = 1.0 + scaled * 99.0 grp['norm_value_1_100'] = normed @@ -776,7 +733,6 @@ class AnalyticalLayerLoader: self.df_clean = pd.concat(norm_parts, ignore_index=True) - # Statistik ringkasan valid_norm = self.df_clean['norm_value_1_100'].notna().sum() null_norm = self.df_clean['norm_value_1_100'].isna().sum() self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") @@ -786,10 +742,14 @@ class AnalyticalLayerLoader: f"{self.df_clean['norm_value_1_100'].max():.2f}" ) - # Log distribusi kondisi berdasarkan threshold - self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition) + self.df_clean['_condition_preview'] = ( + self.df_clean['norm_value_1_100'].apply(assign_condition) + ) cond_dist = self.df_clean['_condition_preview'].value_counts() - self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):") + self.logger.info( + f"\n Distribusi kondisi " + f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):" + ) for cond, cnt in cond_dist.items(): self.logger.info(f" {cond}: {cnt:,} rows") self.df_clean = self.df_clean.drop(columns=['_condition_preview']) @@ -862,45 +822,39 @@ class AnalyticalLayerLoader: 'start_year', 'end_year', 'country_count' ] - # Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split) - ind_fw = ( - self.df_clean + fw_at_end = ( + self.df_clean[self.df_clean['year'] == self.end_year] .groupby('indicator_id')['framework'] - .unique() + .first() .reset_index() ) - ind_fw['framework_label'] = ind_fw['framework'].apply( - lambda x: '/'.join(sorted(x)) - ) - indicator_details = indicator_details.merge( - ind_fw[['indicator_id', 'framework_label']], - on='indicator_id', how='left' - ) + indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left') + indicator_details['framework'] = indicator_details['framework'].fillna('MDGs') indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) indicator_details = indicator_details.sort_values( - ['framework_label', 'pillar_name', 'start_year', 'indicator_name'] + ['framework', 'pillar_name', 'start_year', 'indicator_name'] ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") - self.logger.info(f"Framework breakdown (per indicator label):") - for fw, count in indicator_details.groupby('framework_label').size().items(): + self.logger.info(f"Framework breakdown (at end_year={self.end_year}):") + for fw, count in indicator_details.groupby('framework').size().items(): self.logger.info(f" {fw}: {count} indicators") - self.logger.info(f"\n{'-'*115}") + self.logger.info(f"\n{'-'*110}") self.logger.info( f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " - f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}" + f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" ) - self.logger.info(f"{'-'*115}") + self.logger.info(f"{'-'*110}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} " + f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) @@ -963,22 +917,20 @@ class AnalyticalLayerLoader: self.logger.info(f" Total rows: {len(analytical_df):,}") - # Framework distribution per row fw_dist_rows = analytical_df['framework'].value_counts() self.logger.info(f" Framework distribution (rows):") for fw, cnt in fw_dist_rows.items(): self.logger.info(f" {fw}: {cnt:,} rows") - # Framework distribution per indikator (label) - ind_fw_label = ( - analytical_df - .groupby('indicator_id')['framework'] - .unique() - .apply(lambda x: '/'.join(sorted(x))) + fw_dist_ind = ( + analytical_df[analytical_df['year'] == self.end_year] + .drop_duplicates('indicator_id')['framework'] .value_counts() ) - self.logger.info(f" Framework distribution (per indicator label):") - for fw, cnt in ind_fw_label.items(): + self.logger.info( + f" Framework distribution (indicators at year={self.end_year}):" + ) + for fw, cnt in fw_dist_ind.items(): self.logger.info(f" {fw}: {cnt} indicators") self.logger.info( @@ -1022,26 +974,30 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_start_year' : self.sdg_start_year, - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', - 'framework_assignment' : 'per-row by year (not per-indicator)', - 'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS), - 'condition_thresholds' : { + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'baseline_year' : self.baseline_year, + 'sdg_transition_year' : self.sdg_transition_year, + 'fixed_countries' : len(self.selected_country_ids), + 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'framework_logic' : ( + 'sdg_transition_year = min(actual_start_year) dari SDG-only indicators; ' + 'SDG-only year >= sdg_transition_year → SDGs; ' + 'SDG-only year < sdg_transition_year → MDGs (data tetap ada); ' + 'non-SDG-only → MDGs selalu' + ), + 'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS), + 'condition_thresholds' : { 'bad' : f'< {THRESHOLD_BAD}', 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', 'good' : f'> {THRESHOLD_GOOD}', }, }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), - 'sdg_start_year' : self.sdg_start_year, - 'framework_dist_rows' : fw_dist_rows.to_dict(), - 'framework_dist_inds' : ind_fw_label.to_dict(), + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), + 'sdg_transition_year': self.sdg_transition_year, + 'framework_dist_rows': fw_dist_rows.to_dict(), }) } save_etl_metadata(self.client, metadata) @@ -1064,9 +1020,11 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") - self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)") - self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") + self.logger.info( + "Framework: SDG-only indicators → SDGs mulai sdg_transition_year, " + "MDGs sebelumnya (data tetap ada). Non-SDG-only → MDGs selalu." + ) self.logger.info("=" * 80) self.load_source_data() @@ -1074,10 +1032,10 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() - self.determine_sdg_start_year() # Step 6: per-row framework assignment + self.determine_sdg_start_year() self.verify_no_gaps() - self.calculate_norm_value() # Step 8: norm_value_1_100 - self.calculate_yoy() # Step 9: yoy_change, yoy_pct + self.calculate_norm_value() + self.calculate_yoy() self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -1087,12 +1045,12 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" SDG Transition Year: {self.sdg_transition_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -1116,8 +1074,11 @@ if __name__ == "__main__": print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("Output: fact_asean_food_security_selected -> fs_asean_gold") print(f"Norm: min-max 1-100 per indicator, direction-aware") - print(f"Framework: per-row by year | SDG Proxy: FIES only") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") + print( + "Framework: SDG-only → SDGs mulai sdg_transition_year, MDGs sebelumnya. " + "Non-SDG-only → MDGs selalu." + ) print("=" * 80) logger = setup_logging() @@ -1127,6 +1088,6 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") - print(f" SDG Start Year : {loader.sdg_start_year}") - print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") + print(f" SDG Transition Year : {loader.sdg_transition_year}") + print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file