diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 4396922..bf1381e 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -8,7 +8,7 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Determine SDG start year & assign framework (MDGs/SDGs) per indicator +6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year 7. Verify no gaps 8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) 9. Calculate YoY per indicator per country @@ -22,13 +22,18 @@ NORMALISASI (Step 8): sehingga nilai antar negara dan antar tahun tetap comparable - Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio -FRAMEWORK LOGIC: -- SDG start year dideteksi dari data: tahun pertama indikator FIES lengkap +FRAMEWORK LOGIC (Per-Row, bukan per indikator): +- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap di semua fixed countries (setelah Step 3-5 filter selesai) -- Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS: - * Jika actual_start_year >= sdg_start_year -> 'SDGs' - * Jika actual_start_year < sdg_start_year -> 'MDGs' -- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs' +- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure") + Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs +- Framework di-assign PER BARIS (per year), bukan per indikator: + * row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs' + * Selain itu -> 'MDGs' +- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment) + yang datanya ada sebelum SDGs: + * row lama (year < sdg_start_year) -> 'MDGs' + * row baru (year >= sdg_start_year) -> 'SDGs' """ import pandas as pd @@ -56,13 +61,16 @@ from google.cloud import bigquery # ============================================================================= # SDG INDICATOR KEYWORDS +# Daftar nama indikator (lowercase) yang masuk SDG framework. +# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year, +# dan 'MDGs' untuk baris dengan year < sdg_start_year. # ============================================================================= SDG_INDICATOR_KEYWORDS = frozenset([ - # TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs) + # TARGET 2.1.1 — Prevalence of undernourishment (shared: ada sebelum SDGs) "prevalence of undernourishment (percent) (3-year average)", "number of people undernourished (million) (3-year average)", - # TARGET 2.1.2 — FIES (SDGs only) + # TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs) "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", @@ -75,24 +83,35 @@ SDG_INDICATOR_KEYWORDS = frozenset([ "number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)", - # TARGET 2.2.1 — Stunting (shared) + # TARGET 2.2.1 — Stunting (shared: ada sebelum SDGs) "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", "number of children under 5 years of age who are stunted (modeled estimates) (million)", - # TARGET 2.2.2 — Wasting & Overweight (shared) + # TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs) "percentage of children under 5 years affected by wasting (percent)", "number of children under 5 years affected by wasting (million)", "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", "number of children under 5 years of age who are overweight (modeled estimates) (million)", - # TARGET 2.2.3 — Anaemia (SDGs only) + # TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar + # baris >= sdg_start_year di-assign 'SDGs') "prevalence of anemia among women of reproductive age (15-49 years) (percent)", "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) -# Proxy keywords untuk deteksi era SDGs dari data (indikator murni baru di SDGs) +# ============================================================================= +# SDG ERA PROXY KEYWORDS +# HANYA indikator yang MURNI baru di era SDGs (FIES saja). +# Dipakai untuk mendeteksi sdg_start_year dari data. +# +# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy: +# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya +# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year +# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'. +# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs +# dan dapat dipakai sebagai penanda tahun mulainya era SDGs. +# ============================================================================= _SDG_ERA_PROXY_KEYWORDS = frozenset([ "food insecurity", - "anemia", - "anaemia", + "food insecure", ]) # ============================================================================= @@ -100,8 +119,8 @@ _SDG_ERA_PROXY_KEYWORDS = frozenset([ # ============================================================================= # Digunakan untuk assign kondisi di analysis_layer. # Didefinisikan di sini agar konsisten antara kedua file. -# bad : norm_value_1_100 < THRESHOLD_BAD -# good : norm_value_1_100 > THRESHOLD_GOOD +# bad : norm_value_1_100 < THRESHOLD_BAD +# good : norm_value_1_100 > THRESHOLD_GOOD # moderate : di antara keduanya THRESHOLD_BAD = 40.0 @@ -124,19 +143,40 @@ def assign_condition(norm_value_1_100: float) -> str: return 'moderate' -def assign_framework( +def assign_framework_per_row( indicator_name: str, - actual_start_year: int, + year: int, sdg_start_year: int, ) -> str: """ - Tentukan framework (MDGs/SDGs) per indikator. - 'SDGs' jika nama ada di SDG_INDICATOR_KEYWORDS DAN actual_start_year >= sdg_start_year. - 'MDGs' untuk semua kasus lainnya. + Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator. + + Logic: + - 'SDGs' jika KEDUA kondisi terpenuhi: + 1. Nama indikator ada di SDG_INDICATOR_KEYWORDS + 2. year (tahun baris ini) >= sdg_start_year + - 'MDGs' untuk semua kasus lain. + + Mengapa per row, bukan per indikator? + Indikator "shared" seperti anemia, stunting, wasting, undernourishment + memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan + per indikator menggunakan actual_start_year, indikator-indikator ini + akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year. + Dengan assign per row menggunakan year baris: + - baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs) + - baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs) + + Contoh anemia (sdg_start_year = 2016): + - row year=2013 -> 'MDGs' + - row year=2014 -> 'MDGs' + - row year=2015 -> 'MDGs' + - row year=2016 -> 'SDGs' + - row year=2017 -> 'SDGs' + - ... """ name_lower = str(indicator_name).lower().strip() in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS - if in_sdg_list and actual_start_year >= sdg_start_year: + if in_sdg_list and int(year) >= sdg_start_year: return 'SDGs' return 'MDGs' @@ -154,8 +194,14 @@ class AnalyticalLayerLoader: indicator_id, indicator_name, direction, framework, pillar_id, pillar_name, time_id, year, value, - norm_value_1_100, <- NEWmin-max norm per indikator, skala 1-100, direction-aware + norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware yoy_change, yoy_pct + + Catatan framework: + Framework di-assign PER BARIS (per year), sehingga indikator shared + seperti anemia dapat memiliki framework berbeda di baris yang berbeda: + - baris sebelum sdg_start_year -> 'MDGs' + - baris sejak sdg_start_year -> 'SDGs' """ def __init__(self, client: bigquery.Client): @@ -260,6 +306,14 @@ class AnalyticalLayerLoader: self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) + # Filter single years only (is_year_range == False) + if 'is_year_range' in self.df_clean.columns: + before = len(self.df_clean) + self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy() + self.logger.info( + f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows" + ) + # baseline_year = 2023 hardcode (syarat dosen: minimal 2023) df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_baseline['indicator_id'].nunique() @@ -475,6 +529,11 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") + if removed_indicators: + self.logger.info(f"\n Removed indicators:") + for item in removed_indicators: + self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}") + if not valid_indicators: raise ValueError("No valid indicators found after filtering!") @@ -500,13 +559,18 @@ class AnalyticalLayerLoader: return self.df_clean # ------------------------------------------------------------------ - # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK + # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW # ------------------------------------------------------------------ def determine_sdg_start_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK") + self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW") self.logger.info("=" * 80) + self.logger.info( + " Proxy: FIES only (food insecurity/food insecure).\n" + " Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n" + " Framework di-assign PER BARIS (year), bukan per indikator." + ) # actual_start_year per indikator = max(min_year per country) # = konsisten dengan max_start_year di Step 5 @@ -519,7 +583,7 @@ class AnalyticalLayerLoader: ) indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] - # Deteksi sdg_start_year dari proxy SDGs-only (FIES & anaemia) + # Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia) proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply( lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS) ) @@ -527,51 +591,78 @@ class AnalyticalLayerLoader: if df_proxy.empty: raise ValueError( - "Tidak ada indikator proxy SDGs (FIES/anaemia) yang lolos filter. " - "Pastikan indikator FIES dan anaemia ada di data." + "Tidak ada indikator proxy SDGs (FIES) yang lolos filter. " + "Pastikan indikator FIES (food insecurity/food insecure) ada di data." ) self.sdg_start_year = int(df_proxy['actual_start_year'].min()) self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}") - self.logger.info(f" Proxy indicators:") + self.logger.info(f" Proxy indicators (FIES only):") for _, row in df_proxy.iterrows(): self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") - # Assign framework per indikator - indicator_actual_start['framework'] = indicator_actual_start.apply( - lambda row: assign_framework( - indicator_name = row['indicator_name'], - actual_start_year = int(row['actual_start_year']), - sdg_start_year = self.sdg_start_year, + # ---------------------------------------------------------------- + # Assign framework PER BARIS menggunakan year baris, bukan actual_start_year + # Sehingga indikator "shared" (anemia, stunting, dll) mendapat: + # - 'MDGs' untuk baris sebelum sdg_start_year + # - 'SDGs' untuk baris sejak sdg_start_year + # ---------------------------------------------------------------- + self.df_clean['framework'] = self.df_clean.apply( + lambda row: assign_framework_per_row( + indicator_name = row['indicator_name'], + year = int(row['year']), + sdg_start_year = self.sdg_start_year, ), axis=1 ) - # Log hasil - self.logger.info(f"\n Framework assignment:") - self.logger.info(f" {'-'*80}") - self.logger.info(f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}") - self.logger.info(f" {'-'*80}") - for _, row in indicator_actual_start.sort_values( - ['framework', 'actual_start_year', 'indicator_name'] - ).iterrows(): - self.logger.info( - f" {int(row['indicator_id']):<5} {row['framework']:<10} " - f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}" - ) - - fw_summary = indicator_actual_start['framework'].value_counts() - self.logger.info(f"\n Ringkasan: " + " | ".join(f"{fw}: {cnt}" for fw, cnt in fw_summary.items())) - - # Merge ke df_clean - self.df_clean = self.df_clean.merge( - indicator_actual_start[['indicator_id', 'framework']], + # ---------------------------------------------------------------- + # Logging: ringkasan per indikator (frameworks apa yang muncul) + # ---------------------------------------------------------------- + ind_fw_summary = ( + self.df_clean + .groupby(['indicator_id', 'indicator_name'])['framework'] + .unique() + .reset_index() + ) + ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply( + lambda x: '/'.join(sorted(x)) + ) + ind_fw_summary = ind_fw_summary.merge( + indicator_actual_start[['indicator_id', 'actual_start_year']], on='indicator_id', how='left' ) - self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') + + self.logger.info(f"\n Framework assignment per indikator:") + self.logger.info(f" {'-'*85}") + self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}") + self.logger.info(f" {'-'*85}") + for _, row in ind_fw_summary.sort_values( + ['frameworks', 'actual_start_year', 'indicator_name'] + ).iterrows(): + self.logger.info( + f" {int(row['indicator_id']):<5} {row['frameworks']:<18} " + f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}" + ) + + # Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi + split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs'] + if not split_inds.empty: + self.logger.info( + f"\n [INFO] {len(split_inds)} indikator memiliki framework split " + f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):" + ) + for _, row in split_inds.iterrows(): + self.logger.info(f" - {row['indicator_name'][:60]}") + + fw_summary = self.df_clean['framework'].value_counts() + self.logger.info( + f"\n Ringkasan rows: " + + " | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()) + ) self.logger.info( - f"\n [OK] 'framework' ditambahkan — " + f"\n [OK] 'framework' ditambahkan per row — " f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" ) @@ -609,7 +700,7 @@ class AnalyticalLayerLoader: return True # ------------------------------------------------------------------ - # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR PER COUNTRY + # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR # ------------------------------------------------------------------ def calculate_norm_value(self): @@ -640,7 +731,7 @@ class AnalyticalLayerLoader: "negative", "lower_better", "lower_is_better", "inverse", "neg", }) - df = self.df_clean.copy() + df = self.df_clean.copy() norm_parts = [] indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) @@ -764,36 +855,52 @@ class AnalyticalLayerLoader: ) indicator_details = self.df_clean.groupby([ - 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework' + 'indicator_id', 'indicator_name', 'pillar_name', 'direction' ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() indicator_details.columns = [ - 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework', + 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'start_year', 'end_year', 'country_count' ] + + # Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split) + ind_fw = ( + self.df_clean + .groupby('indicator_id')['framework'] + .unique() + .reset_index() + ) + ind_fw['framework_label'] = ind_fw['framework'].apply( + lambda x: '/'.join(sorted(x)) + ) + indicator_details = indicator_details.merge( + ind_fw[['indicator_id', 'framework_label']], + on='indicator_id', how='left' + ) + indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) indicator_details = indicator_details.sort_values( - ['framework', 'pillar_name', 'start_year', 'indicator_name'] + ['framework_label', 'pillar_name', 'start_year', 'indicator_name'] ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") - self.logger.info(f"Framework breakdown:") - for fw, count in indicator_details.groupby('framework').size().items(): + self.logger.info(f"Framework breakdown (per indicator label):") + for fw, count in indicator_details.groupby('framework_label').size().items(): self.logger.info(f" {fw}: {count} indicators") - self.logger.info(f"\n{'-'*110}") + self.logger.info(f"\n{'-'*115}") self.logger.info( f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " - f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" + f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}" ) - self.logger.info(f"{'-'*110}") + self.logger.info(f"{'-'*115}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " + f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} " f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) @@ -856,9 +963,22 @@ class AnalyticalLayerLoader: self.logger.info(f" Total rows: {len(analytical_df):,}") - fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() - self.logger.info(f" Framework distribution:") - for fw, cnt in fw_dist.items(): + # Framework distribution per row + fw_dist_rows = analytical_df['framework'].value_counts() + self.logger.info(f" Framework distribution (rows):") + for fw, cnt in fw_dist_rows.items(): + self.logger.info(f" {fw}: {cnt:,} rows") + + # Framework distribution per indikator (label) + ind_fw_label = ( + analytical_df + .groupby('indicator_id')['framework'] + .unique() + .apply(lambda x: '/'.join(sorted(x))) + .value_counts() + ) + self.logger.info(f" Framework distribution (per indicator label):") + for fw, cnt in ind_fw_label.items(): self.logger.info(f" {fw}: {cnt} indicators") self.logger.info( @@ -902,23 +1022,26 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_start_year' : self.sdg_start_year, - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', - 'condition_thresholds': { + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'baseline_year' : self.baseline_year, + 'sdg_start_year' : self.sdg_start_year, + 'fixed_countries' : len(self.selected_country_ids), + 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'framework_assignment' : 'per-row by year (not per-indicator)', + 'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS), + 'condition_thresholds' : { 'bad' : f'< {THRESHOLD_BAD}', 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', 'good' : f'> {THRESHOLD_GOOD}', }, }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators': int(self.df_clean['indicator_id'].nunique()), - 'sdg_start_year' : self.sdg_start_year, - 'framework_dist' : fw_dist.to_dict(), + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), + 'sdg_start_year' : self.sdg_start_year, + 'framework_dist_rows' : fw_dist_rows.to_dict(), + 'framework_dist_inds' : ind_fw_label.to_dict(), }) } save_etl_metadata(self.client, metadata) @@ -941,6 +1064,8 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") + self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)") + self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") self.logger.info("=" * 80) @@ -949,10 +1074,10 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() - self.determine_sdg_start_year() + self.determine_sdg_start_year() # Step 6: per-row framework assignment self.verify_no_gaps() - self.calculate_norm_value() # Step 8: norm_value_1_100 - self.calculate_yoy() # Step 9: yoy_change, yoy_pct + self.calculate_norm_value() # Step 8: norm_value_1_100 + self.calculate_yoy() # Step 9: yoy_change, yoy_pct self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -991,6 +1116,7 @@ if __name__ == "__main__": print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("Output: fact_asean_food_security_selected -> fs_asean_gold") print(f"Norm: min-max 1-100 per indicator, direction-aware") + print(f"Framework: per-row by year | SDG Proxy: FIES only") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") print("=" * 80)