From 0f93ff6ecdb131824b1b5bcf129cbbfc3e7cebdd Mon Sep 17 00:00:00 2001 From: Debby Date: Wed, 1 Apr 2026 08:29:18 +0700 Subject: [PATCH] try1 --- scripts/bigquery_analytical_layer.py | 593 ++++++++++++++------------- 1 file changed, 316 insertions(+), 277 deletions(-) diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 9cecb19..bf1381e 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -8,27 +8,32 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries - → TIDAK menghapus baris year < max_start_year - → Semua baris tetap ada; label framework ditentukan di Step 6 -6. Assign framework (MDGs/SDGs) per indicator PER ROW - → Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu - → Indikator DI SDG_ONLY_KEYWORDS + year >= sdg_transition_year → 'SDGs' - → Indikator DI SDG_ONLY_KEYWORDS + year < sdg_transition_year → 'MDGs' - → sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators - yang lolos filter (= tahun pertama data SDG-only konsisten di semua countries) -7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global) -8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global) +6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year +7. Verify no gaps +8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) 9. Calculate YoY per indicator per country 10. Analyze indicator availability by year 11. Save analytical table -FRAMEWORK LOGIC: -- sdg_transition_year dihitung SATU KALI dari actual_start_year SDG-only indicators -- Semua SDG-only indicators menggunakan sdg_transition_year yang SAMA - sehingga label berubah serentak di satu titik waktu -- Baris sebelum sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) -- Baris mulai sdg_transition_year → 'SDGs' -- Indikator non-SDG-only → 'MDGs' selalu +NORMALISASI (Step 8): +- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100 +- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik +- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus) + sehingga nilai antar negara dan antar tahun tetap comparable +- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio + +FRAMEWORK LOGIC (Per-Row, bukan per indikator): +- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap + di semua fixed countries (setelah Step 3-5 filter selesai) +- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure") + Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs +- Framework di-assign PER BARIS (per year), bukan per indikator: + * row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs' + * Selain itu -> 'MDGs' +- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment) + yang datanya ada sebelum SDGs: + * row lama (year < sdg_start_year) -> 'MDGs' + * row baru (year >= sdg_start_year) -> 'SDGs' """ import pandas as pd @@ -55,14 +60,17 @@ from google.cloud import bigquery # ============================================================================= -# SDG-ONLY INDICATOR KEYWORDS +# SDG INDICATOR KEYWORDS +# Daftar nama indikator (lowercase) yang masuk SDG framework. +# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year, +# dan 'MDGs' untuk baris dengan year < sdg_start_year. # ============================================================================= -# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini. -# Indikator di set ini → 'SDGs' mulai dari sdg_transition_year. -# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun. -SDG_ONLY_KEYWORDS = frozenset([ - # TARGET 2.1.2 — FIES (SDGs only) +SDG_INDICATOR_KEYWORDS = frozenset([ + # TARGET 2.1.1 — Prevalence of undernourishment (shared: ada sebelum SDGs) + "prevalence of undernourishment (percent) (3-year average)", + "number of people undernourished (million) (3-year average)", + # TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs) "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", @@ -75,14 +83,45 @@ SDG_ONLY_KEYWORDS = frozenset([ "number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)", - # TARGET 2.2.3 — Anaemia (SDGs only) + # TARGET 2.2.1 — Stunting (shared: ada sebelum SDGs) + "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", + "number of children under 5 years of age who are stunted (modeled estimates) (million)", + # TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs) + "percentage of children under 5 years affected by wasting (percent)", + "number of children under 5 years affected by wasting (million)", + "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", + "number of children under 5 years of age who are overweight (modeled estimates) (million)", + # TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar + # baris >= sdg_start_year di-assign 'SDGs') "prevalence of anemia among women of reproductive age (15-49 years) (percent)", "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) +# ============================================================================= +# SDG ERA PROXY KEYWORDS +# HANYA indikator yang MURNI baru di era SDGs (FIES saja). +# Dipakai untuk mendeteksi sdg_start_year dari data. +# +# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy: +# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya +# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year +# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'. +# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs +# dan dapat dipakai sebagai penanda tahun mulainya era SDGs. +# ============================================================================= +_SDG_ERA_PROXY_KEYWORDS = frozenset([ + "food insecurity", + "food insecure", +]) + # ============================================================================= # THRESHOLD KONDISI (fixed absolute, skala 1-100) # ============================================================================= +# Digunakan untuk assign kondisi di analysis_layer. +# Didefinisikan di sini agar konsisten antara kedua file. +# bad : norm_value_1_100 < THRESHOLD_BAD +# good : norm_value_1_100 > THRESHOLD_GOOD +# moderate : di antara keduanya THRESHOLD_BAD = 40.0 THRESHOLD_GOOD = 60.0 @@ -91,6 +130,8 @@ THRESHOLD_GOOD = 60.0 def assign_condition(norm_value_1_100: float) -> str: """ Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). + Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert). + Returns: 'good' / 'moderate' / 'bad' """ if pd.isna(norm_value_1_100): @@ -102,6 +143,44 @@ def assign_condition(norm_value_1_100: float) -> str: return 'moderate' +def assign_framework_per_row( + indicator_name: str, + year: int, + sdg_start_year: int, +) -> str: + """ + Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator. + + Logic: + - 'SDGs' jika KEDUA kondisi terpenuhi: + 1. Nama indikator ada di SDG_INDICATOR_KEYWORDS + 2. year (tahun baris ini) >= sdg_start_year + - 'MDGs' untuk semua kasus lain. + + Mengapa per row, bukan per indikator? + Indikator "shared" seperti anemia, stunting, wasting, undernourishment + memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan + per indikator menggunakan actual_start_year, indikator-indikator ini + akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year. + Dengan assign per row menggunakan year baris: + - baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs) + - baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs) + + Contoh anemia (sdg_start_year = 2016): + - row year=2013 -> 'MDGs' + - row year=2014 -> 'MDGs' + - row year=2015 -> 'MDGs' + - row year=2016 -> 'SDGs' + - row year=2017 -> 'SDGs' + - ... + """ + name_lower = str(indicator_name).lower().strip() + in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS + if in_sdg_list and int(year) >= sdg_start_year: + return 'SDGs' + return 'MDGs' + + # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -115,17 +194,14 @@ class AnalyticalLayerLoader: indicator_id, indicator_name, direction, framework, pillar_id, pillar_name, time_id, year, value, - norm_value_1_100, + norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware yoy_change, yoy_pct - FRAMEWORK LOGIC: - - Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun - - Indikator DI SDG_ONLY_KEYWORDS: - year < sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) - year >= sdg_transition_year → 'SDGs' - - sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators - yang lolos filter Step 3-5. Semua SDG-only indicators menggunakan - sdg_transition_year yang SAMA agar label berubah serentak. + Catatan framework: + Framework di-assign PER BARIS (per year), sehingga indikator shared + seperti anemia dapat memiliki framework berbeda di baris yang berbeda: + - baris sebelum sdg_start_year -> 'MDGs' + - baris sejak sdg_start_year -> 'SDGs' """ def __init__(self, client: bigquery.Client): @@ -138,13 +214,13 @@ class AnalyticalLayerLoader: self.df_country = None self.df_pillar = None - self.selected_country_ids = None - self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5) - self.sdg_transition_year = None # tahun SDGs mulai berlaku (dari Step 6) + self.selected_country_ids = None self.start_year = 2013 self.end_year = None - self.baseline_year = 2023 + self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap) + + self.sdg_start_year = None self.pipeline_metadata = { 'source_class' : self.__class__.__name__, @@ -230,6 +306,15 @@ class AnalyticalLayerLoader: self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) + # Filter single years only (is_year_range == False) + if 'is_year_range' in self.df_clean.columns: + before = len(self.df_clean) + self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy() + self.logger.info( + f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows" + ) + + # baseline_year = 2023 hardcode (syarat dosen: minimal 2023) df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_baseline['indicator_id'].nunique() @@ -394,8 +479,6 @@ class AnalyticalLayerLoader: self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) - # Hitung max_start_year per indikator = max(min_year per country) - # = tahun pertama di mana SEMUA fixed countries sudah punya data indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() @@ -424,8 +507,6 @@ class AnalyticalLayerLoader: }) continue - # Cek apakah semua tahun dari max_start s/d end_year - # hadir di SEMUA fixed countries expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True @@ -448,173 +529,140 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") + if removed_indicators: + self.logger.info(f"\n Removed indicators:") + for item in removed_indicators: + self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}") + if not valid_indicators: raise ValueError("No valid indicators found after filtering!") - # ---------------------------------------------------------------- - # Filter hanya indikator yang valid. - # PENTING: TIDAK menghapus baris year < max_start_year. - # Semua baris tetap ada — label framework ditentukan di Step 6. - # max_start_year disimpan sebagai lookup untuk Step 6 & 7. - # ---------------------------------------------------------------- original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() - # Simpan max_start_year per indicator_id untuk Step 6 dan Step 7 - self.indicator_max_start_map = ( - indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] - .set_index('indicator_id')['max_start_year'] - .to_dict() + self.df_clean = self.df_clean.merge( + indicator_max_start[['indicator_id', 'max_start_year']], + on='indicator_id', how='left' ) + self.df_clean = self.df_clean[ + self.df_clean['year'] >= self.df_clean['max_start_year'] + ].copy() + self.df_clean = self.df_clean.drop('max_start_year', axis=1) - self.logger.info(f"\n Rows before : {original_count:,}") - self.logger.info(f" Rows after : {len(self.df_clean):,}") - self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}") - self.logger.info( - f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. " - f"Label framework akan ditentukan di Step 6." - ) + self.logger.info(f"\n Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") + self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") + self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean # ------------------------------------------------------------------ - # STEP 6: ASSIGN FRAMEWORK PER ROW + # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW # ------------------------------------------------------------------ def determine_sdg_start_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW") + self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW") self.logger.info("=" * 80) + self.logger.info( + " Proxy: FIES only (food insecurity/food insecure).\n" + " Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n" + " Framework di-assign PER BARIS (year), bukan per indikator." + ) + + # actual_start_year per indikator = max(min_year per country) + # = konsisten dengan max_start_year di Step 5 + indicator_actual_start = ( + self.df_clean + .groupby(['indicator_id', 'indicator_name', 'country_id'])['year'] + .min().reset_index() + .groupby(['indicator_id', 'indicator_name'])['year'] + .max().reset_index() + ) + indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] + + # Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia) + proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply( + lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS) + ) + df_proxy = indicator_actual_start[proxy_mask] + + if df_proxy.empty: + raise ValueError( + "Tidak ada indikator proxy SDGs (FIES) yang lolos filter. " + "Pastikan indikator FIES (food insecurity/food insecure) ada di data." + ) + + self.sdg_start_year = int(df_proxy['actual_start_year'].min()) + self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}") + self.logger.info(f" Proxy indicators (FIES only):") + for _, row in df_proxy.iterrows(): + self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") # ---------------------------------------------------------------- - # Bangun tabel actual_start_year per indikator dari - # indicator_max_start_map yang sudah ditetapkan di Step 5. + # Assign framework PER BARIS menggunakan year baris, bukan actual_start_year + # Sehingga indikator "shared" (anemia, stunting, dll) mendapat: + # - 'MDGs' untuk baris sebelum sdg_start_year + # - 'SDGs' untuk baris sejak sdg_start_year # ---------------------------------------------------------------- - indicator_actual_start = pd.DataFrame([ - {'indicator_id': ind_id, 'actual_start_year': int(start_yr)} - for ind_id, start_yr in self.indicator_max_start_map.items() - ]) + self.df_clean['framework'] = self.df_clean.apply( + lambda row: assign_framework_per_row( + indicator_name = row['indicator_name'], + year = int(row['year']), + sdg_start_year = self.sdg_start_year, + ), + axis=1 + ) - # Merge indicator_name untuk logging - indicator_actual_start = indicator_actual_start.merge( - self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(), + # ---------------------------------------------------------------- + # Logging: ringkasan per indikator (frameworks apa yang muncul) + # ---------------------------------------------------------------- + ind_fw_summary = ( + self.df_clean + .groupby(['indicator_id', 'indicator_name'])['framework'] + .unique() + .reset_index() + ) + ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply( + lambda x: '/'.join(sorted(x)) + ) + ind_fw_summary = ind_fw_summary.merge( + indicator_actual_start[['indicator_id', 'actual_start_year']], on='indicator_id', how='left' ) - # Tandai mana yang SDG-only - indicator_actual_start['is_sdg_only'] = ( - indicator_actual_start['indicator_name'] - .str.lower().str.strip() - .isin(SDG_ONLY_KEYWORDS) - ) - - # ---------------------------------------------------------------- - # sdg_transition_year = min(actual_start_year) dari semua SDG-only - # indicators yang lolos filter. - # Ini adalah satu titik waktu di mana semua SDG-only indicators - # berubah dari 'MDGs' ke 'SDGs' secara SERENTAK. - # ---------------------------------------------------------------- - sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']] - if sdg_only_df.empty: - raise ValueError( - "Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. " - "Pastikan indikator FIES dan anaemia ada di data." - ) - - self.sdg_transition_year = int(sdg_only_df['actual_start_year'].min()) - - self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:") - self.logger.info(f" {'-'*80}") - for _, row in sdg_only_df.iterrows(): + self.logger.info(f"\n Framework assignment per indikator:") + self.logger.info(f" {'-'*85}") + self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}") + self.logger.info(f" {'-'*85}") + for _, row in ind_fw_summary.sort_values( + ['frameworks', 'actual_start_year', 'indicator_name'] + ).iterrows(): self.logger.info( - f" [SDG-only] actual_start={int(row['actual_start_year'])} | " - f"{row['indicator_name']}" + f" {int(row['indicator_id']):<5} {row['frameworks']:<18} " + f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}" ) - self.logger.info( - f"\n sdg_transition_year = {self.sdg_transition_year} " - f"(min actual_start_year dari semua SDG-only indicators)" - ) - - self.logger.info(f"\n Logika assign framework (PER BARIS):") - self.logger.info(f" ──────────────────────────────────────────────────────────") - self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS:") - self.logger.info(f" → 'MDGs' di semua tahun") - self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:") - self.logger.info(f" year < {self.sdg_transition_year} → 'MDGs' (data tetap ada)") - self.logger.info(f" year >= {self.sdg_transition_year} → 'SDGs'") - self.logger.info(f" ──────────────────────────────────────────────────────────") - - # ---------------------------------------------------------------- - # Assign framework dengan vectorized operation menggunakan - # sdg_transition_year (SATU nilai untuk semua SDG-only indicators) - # ---------------------------------------------------------------- - # Tandai apakah setiap baris adalah SDG-only indicator - sdg_only_ids = set( - indicator_actual_start.loc[ - indicator_actual_start['is_sdg_only'], 'indicator_id' - ] - ) - self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids) - - # Assign framework: - # - Bukan SDG-only → 'MDGs' - # - SDG-only AND year >= sdg_transition_year → 'SDGs' - # - SDG-only AND year < sdg_transition_year → 'MDGs' - self.df_clean['framework'] = np.where( - self.df_clean['_is_sdg_only'] & - (self.df_clean['year'] >= self.sdg_transition_year), - 'SDGs', - 'MDGs' - ) - - # Drop kolom bantu - self.df_clean = self.df_clean.drop(columns=['_is_sdg_only']) - - # ---------------------------------------------------------------- - # Log verifikasi per indikator - # ---------------------------------------------------------------- - self.logger.info(f"\n Verifikasi framework per indikator:") - self.logger.info(f" {'-'*110}") - self.logger.info( - f" {'ID':<5} {'Indicator Name':<52} {'Data From':<12} " - f"{'MDGs rows':<12} {'SDGs rows':<12} {'Note'}" - ) - self.logger.info(f" {'-'*110}") - - for ind_id, grp in self.df_clean.groupby('indicator_id'): - ind_name = grp['indicator_name'].iloc[0] - mdgs_rows = (grp['framework'] == 'MDGs').sum() - sdgs_rows = (grp['framework'] == 'SDGs').sum() - is_sdg_only = ind_id in sdg_only_ids - data_from = int(grp['year'].min()) - - if is_sdg_only: - note = f"SDGs from {self.sdg_transition_year}, MDGs before" - else: - note = "MDGs always" - + # Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi + split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs'] + if not split_inds.empty: self.logger.info( - f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<12} " - f"{mdgs_rows:<12} {sdgs_rows:<12} {note}" + f"\n [INFO] {len(split_inds)} indikator memiliki framework split " + f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):" ) + for _, row in split_inds.iterrows(): + self.logger.info(f" - {row['indicator_name'][:60]}") fw_summary = self.df_clean['framework'].value_counts() - self.logger.info(f"\n Ringkasan rows: " + " | ".join( - f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items() - )) - - end_year_df = self.df_clean[self.df_clean['year'] == self.end_year] - fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique() - self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join( - f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items() - )) + self.logger.info( + f"\n Ringkasan rows: " + + " | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()) + ) self.logger.info( - f"\n [OK] 'framework' ditambahkan — " + f"\n [OK] 'framework' ditambahkan per row — " f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" ) @@ -629,44 +677,23 @@ class AnalyticalLayerLoader: self.logger.info("STEP 7: VERIFY NO GAPS") self.logger.info("=" * 80) - # ---------------------------------------------------------------- - # Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya, - # bukan dari self.start_year global, karena tiap indikator bisa - # punya start year berbeda. - # Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek - # karena memang tidak semua country punya data di sana. - # ---------------------------------------------------------------- expected_countries = len(self.selected_country_ids) - all_good = True - bad_rows = [] - - for ind_id, grp in self.df_clean.groupby('indicator_id'): - actual_start = self.indicator_max_start_map.get(ind_id) - if actual_start is None: - continue - - expected_years = list(range(int(actual_start), self.end_year + 1)) - - for year in expected_years: - country_count = grp[grp['year'] == year]['country_id'].nunique() - if country_count != expected_countries: - all_good = False - bad_rows.append({ - 'indicator_id' : int(ind_id), - 'year' : int(year), - 'country_count': int(country_count), - }) + verification = self.df_clean.groupby( + ['indicator_id', 'year'] + )['country_id'].nunique().reset_index() + verification.columns = ['indicator_id', 'year', 'country_count'] + all_good = (verification['country_count'] == expected_countries).all() if all_good: self.logger.info( - f" VERIFICATION PASSED — all combinations from actual_start_year " - f"have {expected_countries} countries" + f" VERIFICATION PASSED — all combinations have {expected_countries} countries" ) else: - for row in bad_rows[:10]: + bad = verification[verification['country_count'] != expected_countries] + for _, row in bad.head(10).iterrows(): self.logger.error( - f" Indicator {row['indicator_id']}, Year {row['year']}: " - f"{row['country_count']} countries (expected {expected_countries})" + f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " + f"{int(row['country_count'])} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") @@ -679,7 +706,22 @@ class AnalyticalLayerLoader: def calculate_norm_value(self): """ Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, - direction-aware, global per indikator (semua negara + semua tahun). + direction-aware. + + CARA KERJA: + - Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus) + sehingga nilai antar negara dan antar tahun tetap comparable. + - lower_better diinvert: nilai tinggi selalu = kondisi lebih baik. + Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert. + - Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio. + - Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan + (persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama. + + Catatan: + - Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer + yang skala 0-1 dan dipakai untuk agregasi composite score. + - norm_value_1_100 ini adalah per baris (per country per year per indicator), + untuk ditampilkan langsung di Looker Studio. """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") @@ -693,10 +735,7 @@ class AnalyticalLayerLoader: norm_parts = [] indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) - self.logger.info( - f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} " - f"{'Min':>10} {'Max':>10} {'Indicator Name'}" - ) + self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}") self.logger.info(f" {'-'*90}") for (ind_id, ind_name, direction), grp in indicators: @@ -710,17 +749,21 @@ class AnalyticalLayerLoader: norm_parts.append(grp) continue - raw = grp.loc[valid_mask, 'value'].values - v_min = raw.min() - v_max = raw.max() - normed = np.full(len(grp), np.nan) + raw = grp.loc[valid_mask, 'value'].values + v_min = raw.min() + v_max = raw.max() + normed = np.full(len(grp), np.nan) if v_min == v_max: + # Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100) normed[valid_mask.values] = 50.5 else: + # Min-max ke 0-1 dulu scaled = (raw - v_min) / (v_max - v_min) + # Invert jika lower_better if do_invert: scaled = 1.0 - scaled + # Scale ke 1-100 normed[valid_mask.values] = 1.0 + scaled * 99.0 grp['norm_value_1_100'] = normed @@ -733,6 +776,7 @@ class AnalyticalLayerLoader: self.df_clean = pd.concat(norm_parts, ignore_index=True) + # Statistik ringkasan valid_norm = self.df_clean['norm_value_1_100'].notna().sum() null_norm = self.df_clean['norm_value_1_100'].isna().sum() self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") @@ -742,14 +786,10 @@ class AnalyticalLayerLoader: f"{self.df_clean['norm_value_1_100'].max():.2f}" ) - self.df_clean['_condition_preview'] = ( - self.df_clean['norm_value_1_100'].apply(assign_condition) - ) + # Log distribusi kondisi berdasarkan threshold + self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition) cond_dist = self.df_clean['_condition_preview'].value_counts() - self.logger.info( - f"\n Distribusi kondisi " - f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):" - ) + self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):") for cond, cnt in cond_dist.items(): self.logger.info(f" {cond}: {cnt:,} rows") self.df_clean = self.df_clean.drop(columns=['_condition_preview']) @@ -822,39 +862,45 @@ class AnalyticalLayerLoader: 'start_year', 'end_year', 'country_count' ] - fw_at_end = ( - self.df_clean[self.df_clean['year'] == self.end_year] + # Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split) + ind_fw = ( + self.df_clean .groupby('indicator_id')['framework'] - .first() + .unique() .reset_index() ) - indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left') - indicator_details['framework'] = indicator_details['framework'].fillna('MDGs') + ind_fw['framework_label'] = ind_fw['framework'].apply( + lambda x: '/'.join(sorted(x)) + ) + indicator_details = indicator_details.merge( + ind_fw[['indicator_id', 'framework_label']], + on='indicator_id', how='left' + ) indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) indicator_details = indicator_details.sort_values( - ['framework', 'pillar_name', 'start_year', 'indicator_name'] + ['framework_label', 'pillar_name', 'start_year', 'indicator_name'] ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") - self.logger.info(f"Framework breakdown (at end_year={self.end_year}):") - for fw, count in indicator_details.groupby('framework').size().items(): + self.logger.info(f"Framework breakdown (per indicator label):") + for fw, count in indicator_details.groupby('framework_label').size().items(): self.logger.info(f" {fw}: {count} indicators") - self.logger.info(f"\n{'-'*110}") + self.logger.info(f"\n{'-'*115}") self.logger.info( f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " - f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" + f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}" ) - self.logger.info(f"{'-'*110}") + self.logger.info(f"{'-'*115}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " + f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} " f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) @@ -917,20 +963,22 @@ class AnalyticalLayerLoader: self.logger.info(f" Total rows: {len(analytical_df):,}") + # Framework distribution per row fw_dist_rows = analytical_df['framework'].value_counts() self.logger.info(f" Framework distribution (rows):") for fw, cnt in fw_dist_rows.items(): self.logger.info(f" {fw}: {cnt:,} rows") - fw_dist_ind = ( - analytical_df[analytical_df['year'] == self.end_year] - .drop_duplicates('indicator_id')['framework'] + # Framework distribution per indikator (label) + ind_fw_label = ( + analytical_df + .groupby('indicator_id')['framework'] + .unique() + .apply(lambda x: '/'.join(sorted(x))) .value_counts() ) - self.logger.info( - f" Framework distribution (indicators at year={self.end_year}):" - ) - for fw, cnt in fw_dist_ind.items(): + self.logger.info(f" Framework distribution (per indicator label):") + for fw, cnt in ind_fw_label.items(): self.logger.info(f" {fw}: {cnt} indicators") self.logger.info( @@ -974,30 +1022,26 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_transition_year' : self.sdg_transition_year, - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', - 'framework_logic' : ( - 'sdg_transition_year = min(actual_start_year) dari SDG-only indicators; ' - 'SDG-only year >= sdg_transition_year → SDGs; ' - 'SDG-only year < sdg_transition_year → MDGs (data tetap ada); ' - 'non-SDG-only → MDGs selalu' - ), - 'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS), - 'condition_thresholds' : { + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'baseline_year' : self.baseline_year, + 'sdg_start_year' : self.sdg_start_year, + 'fixed_countries' : len(self.selected_country_ids), + 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'framework_assignment' : 'per-row by year (not per-indicator)', + 'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS), + 'condition_thresholds' : { 'bad' : f'< {THRESHOLD_BAD}', 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', 'good' : f'> {THRESHOLD_GOOD}', }, }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), - 'sdg_transition_year': self.sdg_transition_year, - 'framework_dist_rows': fw_dist_rows.to_dict(), + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), + 'sdg_start_year' : self.sdg_start_year, + 'framework_dist_rows' : fw_dist_rows.to_dict(), + 'framework_dist_inds' : ind_fw_label.to_dict(), }) } save_etl_metadata(self.client, metadata) @@ -1020,11 +1064,9 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") + self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)") + self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info( - "Framework: SDG-only indicators → SDGs mulai sdg_transition_year, " - "MDGs sebelumnya (data tetap ada). Non-SDG-only → MDGs selalu." - ) self.logger.info("=" * 80) self.load_source_data() @@ -1032,10 +1074,10 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() - self.determine_sdg_start_year() + self.determine_sdg_start_year() # Step 6: per-row framework assignment self.verify_no_gaps() - self.calculate_norm_value() - self.calculate_yoy() + self.calculate_norm_value() # Step 8: norm_value_1_100 + self.calculate_yoy() # Step 9: yoy_change, yoy_pct self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -1045,12 +1087,12 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" SDG Transition Year: {self.sdg_transition_year}") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -1074,11 +1116,8 @@ if __name__ == "__main__": print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("Output: fact_asean_food_security_selected -> fs_asean_gold") print(f"Norm: min-max 1-100 per indicator, direction-aware") + print(f"Framework: per-row by year | SDG Proxy: FIES only") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - print( - "Framework: SDG-only → SDGs mulai sdg_transition_year, MDGs sebelumnya. " - "Non-SDG-only → MDGs selalu." - ) print("=" * 80) logger = setup_logging() @@ -1088,6 +1127,6 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") - print(f" SDG Transition Year : {loader.sdg_transition_year}") - print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") + print(f" SDG Start Year : {loader.sdg_start_year}") + print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file