diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 3b61593..9cecb19 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -8,31 +8,27 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries + → TIDAK menghapus baris year < max_start_year + → Semua baris tetap ada; label framework ditentukan di Step 6 6. Assign framework (MDGs/SDGs) per indicator PER ROW -7. Verify no gaps -8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) + → Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu + → Indikator DI SDG_ONLY_KEYWORDS + year >= sdg_transition_year → 'SDGs' + → Indikator DI SDG_ONLY_KEYWORDS + year < sdg_transition_year → 'MDGs' + → sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators + yang lolos filter (= tahun pertama data SDG-only konsisten di semua countries) +7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global) +8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global) 9. Calculate YoY per indicator per country 10. Analyze indicator availability by year 11. Save analytical table -NORMALISASI (Step 8): -- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100 -- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik -- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus) - sehingga nilai antar negara dan antar tahun tetap comparable -- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio - -FRAMEWORK LOGIC (FIX - Per Indicator, Per Row): -- Framework di-assign PER BARIS dengan mempertimbangkan actual_start_year MASING-MASING - indikator, bukan satu sdg_start_year global. -- Logika: - * Jika nama indikator TIDAK ada di SDG_ONLY_KEYWORDS → selalu 'MDGs' (semua tahun) - * Jika nama indikator ADA di SDG_ONLY_KEYWORDS: - - row['year'] >= actual_start_year[indicator] → 'SDGs' - - row['year'] < actual_start_year[indicator] → 'MDGs' -- Baris dengan year < actual_start_year TETAP ADA di data (tidak dihapus di Step 5), - hanya mendapat label 'MDGs'. -- actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter +FRAMEWORK LOGIC: +- sdg_transition_year dihitung SATU KALI dari actual_start_year SDG-only indicators +- Semua SDG-only indicators menggunakan sdg_transition_year yang SAMA + sehingga label berubah serentak di satu titik waktu +- Baris sebelum sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) +- Baris mulai sdg_transition_year → 'SDGs' +- Indikator non-SDG-only → 'MDGs' selalu """ import pandas as pd @@ -62,7 +58,7 @@ from google.cloud import bigquery # SDG-ONLY INDICATOR KEYWORDS # ============================================================================= # Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini. -# Indikator di set ini → 'SDGs' mulai dari actual_start_year indikator tersebut. +# Indikator di set ini → 'SDGs' mulai dari sdg_transition_year. # Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun. SDG_ONLY_KEYWORDS = frozenset([ @@ -122,13 +118,14 @@ class AnalyticalLayerLoader: norm_value_1_100, yoy_change, yoy_pct - FRAMEWORK LOGIC (FIX): + FRAMEWORK LOGIC: - Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun - Indikator DI SDG_ONLY_KEYWORDS: - year >= actual_start_year[indikator] → 'SDGs' - year < actual_start_year[indikator] → 'MDGs' - - actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter - - Baris year < actual_start_year TETAP ADA, hanya berlabel 'MDGs' + year < sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) + year >= sdg_transition_year → 'SDGs' + - sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators + yang lolos filter Step 3-5. Semua SDG-only indicators menggunakan + sdg_transition_year yang SAMA agar label berubah serentak. """ def __init__(self, client: bigquery.Client): @@ -143,13 +140,12 @@ class AnalyticalLayerLoader: self.selected_country_ids = None self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5) + self.sdg_transition_year = None # tahun SDGs mulai berlaku (dari Step 6) self.start_year = 2013 self.end_year = None self.baseline_year = 2023 - self.sdg_start_year = None # disimpan untuk metadata/logging saja - self.pipeline_metadata = { 'source_class' : self.__class__.__name__, 'start_time' : None, @@ -398,6 +394,8 @@ class AnalyticalLayerLoader: self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) + # Hitung max_start_year per indikator = max(min_year per country) + # = tahun pertama di mana SEMUA fixed countries sudah punya data indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() @@ -426,6 +424,8 @@ class AnalyticalLayerLoader: }) continue + # Cek apakah semua tahun dari max_start s/d end_year + # hadir di SEMUA fixed countries expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True @@ -452,50 +452,53 @@ class AnalyticalLayerLoader: raise ValueError("No valid indicators found after filtering!") # ---------------------------------------------------------------- - # Filter hanya indikator yang valid - # TIDAK menghapus baris year < max_start_year — - # semua baris tetap ada, label framework ditentukan di Step 6 + # Filter hanya indikator yang valid. + # PENTING: TIDAK menghapus baris year < max_start_year. + # Semua baris tetap ada — label framework ditentukan di Step 6. + # max_start_year disimpan sebagai lookup untuk Step 6 & 7. # ---------------------------------------------------------------- original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() - # Simpan max_start_year sebagai lookup untuk Step 6 + # Simpan max_start_year per indicator_id untuk Step 6 dan Step 7 self.indicator_max_start_map = ( indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] .set_index('indicator_id')['max_start_year'] .to_dict() ) - self.logger.info(f"\n Rows before: {original_count:,}") - self.logger.info(f" Rows after: {len(self.df_clean):,}") - self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") - self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") + self.logger.info(f"\n Rows before : {original_count:,}") + self.logger.info(f" Rows after : {len(self.df_clean):,}") + self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}") + self.logger.info( + f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. " + f"Label framework akan ditentukan di Step 6." + ) return self.df_clean # ------------------------------------------------------------------ - # STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year) + # STEP 6: ASSIGN FRAMEWORK PER ROW # ------------------------------------------------------------------ def determine_sdg_start_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year)") + self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW") self.logger.info("=" * 80) # ---------------------------------------------------------------- - # Hitung actual_start_year PER INDIKATOR dari indicator_max_start_map - # yang sudah dihitung di Step 5. - # actual_start_year = max(min_year per country) per indikator - # = tahun di mana semua fixed countries sudah punya data + # Bangun tabel actual_start_year per indikator dari + # indicator_max_start_map yang sudah ditetapkan di Step 5. # ---------------------------------------------------------------- indicator_actual_start = pd.DataFrame([ - {'indicator_id': ind_id, 'actual_start_year': start_yr} + {'indicator_id': ind_id, 'actual_start_year': int(start_yr)} for ind_id, start_yr in self.indicator_max_start_map.items() ]) - # Merge indicator_name untuk keperluan logging + # Merge indicator_name untuk logging indicator_actual_start = indicator_actual_start.merge( self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(), on='indicator_id', how='left' @@ -508,91 +511,95 @@ class AnalyticalLayerLoader: .isin(SDG_ONLY_KEYWORDS) ) - # sdg_start_year global = min(actual_start_year dari SDG-only indicators) - # Disimpan hanya untuk metadata/logging + # ---------------------------------------------------------------- + # sdg_transition_year = min(actual_start_year) dari semua SDG-only + # indicators yang lolos filter. + # Ini adalah satu titik waktu di mana semua SDG-only indicators + # berubah dari 'MDGs' ke 'SDGs' secara SERENTAK. + # ---------------------------------------------------------------- sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']] if sdg_only_df.empty: raise ValueError( "Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. " "Pastikan indikator FIES dan anaemia ada di data." ) - self.sdg_start_year = int(sdg_only_df['actual_start_year'].min()) + + self.sdg_transition_year = int(sdg_only_df['actual_start_year'].min()) self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:") self.logger.info(f" {'-'*80}") - for _, row in indicator_actual_start[indicator_actual_start['is_sdg_only']].iterrows(): + for _, row in sdg_only_df.iterrows(): self.logger.info( - f" [SDG-only] start={int(row['actual_start_year'])} | {row['indicator_name']}" + f" [SDG-only] actual_start={int(row['actual_start_year'])} | " + f"{row['indicator_name']}" ) + self.logger.info( - f"\n sdg_start_year (earliest SDG-only, for metadata): {self.sdg_start_year}" + f"\n sdg_transition_year = {self.sdg_transition_year} " + f"(min actual_start_year dari semua SDG-only indicators)" ) - # Lookup: indicator_id → actual_start_year (hanya SDG-only, untuk logging) - sdg_only_start_map = ( - indicator_actual_start[indicator_actual_start['is_sdg_only']] - .set_index('indicator_id')['actual_start_year'] - .to_dict() - ) - - self.logger.info(f"\n Logika assign framework (PER BARIS, PER INDIKATOR):") - self.logger.info(f" ─────────────────────────────────────────────────────") - self.logger.info(f" Jika indikator TIDAK di SDG_ONLY_KEYWORDS:") - self.logger.info(f" → 'MDGs' di semua tahun (shared indicators)") - self.logger.info(f" Jika indikator DI SDG_ONLY_KEYWORDS:") - self.logger.info(f" year >= actual_start_year[indikator] → 'SDGs'") - self.logger.info(f" year < actual_start_year[indikator] → 'MDGs'") - self.logger.info(f" ─────────────────────────────────────────────────────") + self.logger.info(f"\n Logika assign framework (PER BARIS):") + self.logger.info(f" ──────────────────────────────────────────────────────────") + self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS:") + self.logger.info(f" → 'MDGs' di semua tahun") + self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:") + self.logger.info(f" year < {self.sdg_transition_year} → 'MDGs' (data tetap ada)") + self.logger.info(f" year >= {self.sdg_transition_year} → 'SDGs'") + self.logger.info(f" ──────────────────────────────────────────────────────────") # ---------------------------------------------------------------- - # Assign framework dengan vectorized merge + # Assign framework dengan vectorized operation menggunakan + # sdg_transition_year (SATU nilai untuk semua SDG-only indicators) # ---------------------------------------------------------------- - self.df_clean = self.df_clean.merge( - indicator_actual_start[['indicator_id', 'is_sdg_only', 'actual_start_year']], - on='indicator_id', - how='left' + # Tandai apakah setiap baris adalah SDG-only indicator + sdg_only_ids = set( + indicator_actual_start.loc[ + indicator_actual_start['is_sdg_only'], 'indicator_id' + ] ) + self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids) # Assign framework: - # - Jika bukan SDG-only → 'MDGs' - # - Jika SDG-only AND year >= actual_start_year → 'SDGs' - # - Jika SDG-only AND year < actual_start_year → 'MDGs' + # - Bukan SDG-only → 'MDGs' + # - SDG-only AND year >= sdg_transition_year → 'SDGs' + # - SDG-only AND year < sdg_transition_year → 'MDGs' self.df_clean['framework'] = np.where( - self.df_clean['is_sdg_only'] & (self.df_clean['year'] >= self.df_clean['actual_start_year']), + self.df_clean['_is_sdg_only'] & + (self.df_clean['year'] >= self.sdg_transition_year), 'SDGs', 'MDGs' ) # Drop kolom bantu - self.df_clean = self.df_clean.drop(columns=['is_sdg_only', 'actual_start_year']) + self.df_clean = self.df_clean.drop(columns=['_is_sdg_only']) # ---------------------------------------------------------------- # Log verifikasi per indikator # ---------------------------------------------------------------- self.logger.info(f"\n Verifikasi framework per indikator:") - self.logger.info(f" {'-'*105}") + self.logger.info(f" {'-'*110}") self.logger.info( - f" {'ID':<5} {'Indicator Name':<52} {'Start':<8} " - f"{'MDGs rows':<12} {'SDGs rows':<12} {'Expected'}" + f" {'ID':<5} {'Indicator Name':<52} {'Data From':<12} " + f"{'MDGs rows':<12} {'SDGs rows':<12} {'Note'}" ) - self.logger.info(f" {'-'*105}") + self.logger.info(f" {'-'*110}") for ind_id, grp in self.df_clean.groupby('indicator_id'): ind_name = grp['indicator_name'].iloc[0] mdgs_rows = (grp['framework'] == 'MDGs').sum() sdgs_rows = (grp['framework'] == 'SDGs').sum() - is_sdg_only = ind_name.lower().strip() in SDG_ONLY_KEYWORDS - start_yr = int(grp['year'].min()) + is_sdg_only = ind_id in sdg_only_ids + data_from = int(grp['year'].min()) if is_sdg_only: - ind_start = sdg_only_start_map.get(ind_id, '?') - expected = f"SDGs from {ind_start}, MDGs before" + note = f"SDGs from {self.sdg_transition_year}, MDGs before" else: - expected = "MDGs always" + note = "MDGs always" self.logger.info( - f" {int(ind_id):<5} {ind_name[:50]:<52} {start_yr:<8} " - f"{mdgs_rows:<12} {sdgs_rows:<12} {expected}" + f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<12} " + f"{mdgs_rows:<12} {sdgs_rows:<12} {note}" ) fw_summary = self.df_clean['framework'].value_counts() @@ -626,6 +633,8 @@ class AnalyticalLayerLoader: # Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya, # bukan dari self.start_year global, karena tiap indikator bisa # punya start year berbeda. + # Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek + # karena memang tidak semua country punya data di sana. # ---------------------------------------------------------------- expected_countries = len(self.selected_country_ids) all_good = True @@ -650,7 +659,8 @@ class AnalyticalLayerLoader: if all_good: self.logger.info( - f" VERIFICATION PASSED — all combinations have {expected_countries} countries" + f" VERIFICATION PASSED — all combinations from actual_start_year " + f"have {expected_countries} countries" ) else: for row in bad_rows[:10]: @@ -683,7 +693,10 @@ class AnalyticalLayerLoader: norm_parts = [] indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) - self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}") + self.logger.info( + f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} " + f"{'Min':>10} {'Max':>10} {'Indicator Name'}" + ) self.logger.info(f" {'-'*90}") for (ind_id, ind_name, direction), grp in indicators: @@ -729,9 +742,14 @@ class AnalyticalLayerLoader: f"{self.df_clean['norm_value_1_100'].max():.2f}" ) - self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition) + self.df_clean['_condition_preview'] = ( + self.df_clean['norm_value_1_100'].apply(assign_condition) + ) cond_dist = self.df_clean['_condition_preview'].value_counts() - self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):") + self.logger.info( + f"\n Distribusi kondisi " + f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):" + ) for cond, cnt in cond_dist.items(): self.logger.info(f" {cond}: {cnt:,} rows") self.df_clean = self.df_clean.drop(columns=['_condition_preview']) @@ -909,7 +927,9 @@ class AnalyticalLayerLoader: .drop_duplicates('indicator_id')['framework'] .value_counts() ) - self.logger.info(f" Framework distribution (indicators at year={self.end_year}):") + self.logger.info( + f" Framework distribution (indicators at year={self.end_year}):" + ) for fw, cnt in fw_dist_ind.items(): self.logger.info(f" {fw}: {cnt} indicators") @@ -954,28 +974,29 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_start_year' : self.sdg_start_year, - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', - 'framework_logic' : ( - 'per-indicator actual_start_year: ' - 'SDG-only indicator → SDGs from its own actual_start_year, MDGs before; ' - 'shared/other indicators → MDGs always' + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'baseline_year' : self.baseline_year, + 'sdg_transition_year' : self.sdg_transition_year, + 'fixed_countries' : len(self.selected_country_ids), + 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'framework_logic' : ( + 'sdg_transition_year = min(actual_start_year) dari SDG-only indicators; ' + 'SDG-only year >= sdg_transition_year → SDGs; ' + 'SDG-only year < sdg_transition_year → MDGs (data tetap ada); ' + 'non-SDG-only → MDGs selalu' ), - 'sdg_only_keywords_count' : len(SDG_ONLY_KEYWORDS), - 'condition_thresholds' : { + 'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS), + 'condition_thresholds' : { 'bad' : f'< {THRESHOLD_BAD}', 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', 'good' : f'> {THRESHOLD_GOOD}', }, }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators': int(self.df_clean['indicator_id'].nunique()), - 'sdg_start_year' : self.sdg_start_year, + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), + 'sdg_transition_year': self.sdg_transition_year, 'framework_dist_rows': fw_dist_rows.to_dict(), }) } @@ -1000,7 +1021,10 @@ class AnalyticalLayerLoader: self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)") + self.logger.info( + "Framework: SDG-only indicators → SDGs mulai sdg_transition_year, " + "MDGs sebelumnya (data tetap ada). Non-SDG-only → MDGs selalu." + ) self.logger.info("=" * 80) self.load_source_data() @@ -1021,12 +1045,12 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" SDG Transition Year: {self.sdg_transition_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -1051,7 +1075,10 @@ if __name__ == "__main__": print("Output: fact_asean_food_security_selected -> fs_asean_gold") print(f"Norm: min-max 1-100 per indicator, direction-aware") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - print("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)") + print( + "Framework: SDG-only → SDGs mulai sdg_transition_year, MDGs sebelumnya. " + "Non-SDG-only → MDGs selalu." + ) print("=" * 80) logger = setup_logging() @@ -1061,6 +1088,6 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") - print(f" SDG Start Year : {loader.sdg_start_year}") - print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") + print(f" SDG Transition Year : {loader.sdg_transition_year}") + print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file