diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 969fdcc..4b64008 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -22,17 +22,16 @@ NORMALISASI (Step 8): sehingga nilai antar negara dan antar tahun tetap comparable - Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio -FRAMEWORK LOGIC (FIX - Row-Level Assignment): +FRAMEWORK LOGIC (Row-Level Assignment): - SDG start year dideteksi dari data: tahun pertama indikator FIES/anaemia lengkap di semua fixed countries (setelah Step 3-5 filter selesai) -- Framework di-assign PER BARIS (per tahun), bukan per indikator: - * Jika row['year'] < sdg_start_year -> selalu 'MDGs' - * Jika row['year'] >= sdg_start_year DAN - nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs' - * Selain itu -> 'MDGs' -- Dengan demikian, indikator seperti "Prevalence of anemia" yang datanya dimulai - sebelum era SDGs akan berlabel 'MDGs' untuk tahun-tahun pra-SDGs dan 'SDGs' - untuk tahun-tahun pasca (>= sdg_start_year). +- Framework di-assign PER BARIS (per tahun): + * year < sdg_start_year → selalu 'MDGs' (semua indikator) + * year >= sdg_start_year + nama di SDG_ONLY_KEYWORDS → 'SDGs' + * selain itu (implisit) → 'MDGs' +- Hanya FIES dan anaemia yang masuk SDG_ONLY_KEYWORDS karena murni baru di era SDGs. +- Shared indicators (stunting, wasting, overweight, undernourishment) tidak terdaftar + di SDG_ONLY_KEYWORDS sehingga secara implisit selalu berlabel 'MDGs' di semua tahun. """ import pandas as pd @@ -59,13 +58,13 @@ from google.cloud import bigquery # ============================================================================= -# SDG INDICATOR KEYWORDS +# SDG-ONLY INDICATOR KEYWORDS # ============================================================================= +# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini. +# Baris dengan year >= sdg_start_year + nama ada di set ini → 'SDGs'. +# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' secara implisit. -SDG_INDICATOR_KEYWORDS = frozenset([ - # TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs) - "prevalence of undernourishment (percent) (3-year average)", - "number of people undernourished (million) (3-year average)", +SDG_ONLY_KEYWORDS = frozenset([ # TARGET 2.1.2 — FIES (SDGs only) "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", @@ -79,15 +78,7 @@ SDG_INDICATOR_KEYWORDS = frozenset([ "number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)", - # TARGET 2.2.1 — Stunting (shared) - "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", - "number of children under 5 years of age who are stunted (modeled estimates) (million)", - # TARGET 2.2.2 — Wasting & Overweight (shared) - "percentage of children under 5 years affected by wasting (percent)", - "number of children under 5 years affected by wasting (million)", - "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", - "number of children under 5 years of age who are overweight (modeled estimates) (million)", - # TARGET 2.2.3 — Anaemia (SDGs only — listed here so rows >= sdg_start_year become SDGs) + # TARGET 2.2.3 — Anaemia (SDGs only) "prevalence of anemia among women of reproductive age (15-49 years) (percent)", "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) @@ -102,8 +93,6 @@ _SDG_ERA_PROXY_KEYWORDS = frozenset([ # ============================================================================= # THRESHOLD KONDISI (fixed absolute, skala 1-100) # ============================================================================= -# Digunakan untuk assign kondisi di analysis_layer. -# Didefinisikan di sini agar konsisten antara kedua file. # bad : norm_value_1_100 < THRESHOLD_BAD # good : norm_value_1_100 > THRESHOLD_GOOD # moderate : di antara keduanya @@ -134,27 +123,29 @@ def assign_framework_for_row( sdg_start_year: int, ) -> str: """ - Tentukan framework (MDGs/SDGs) PER BARIS (per tahun), bukan per indikator. + Tentukan framework (MDGs/SDGs) PER BARIS (per tahun). Logic: - - Jika row_year < sdg_start_year → selalu 'MDGs', apapun nama indikatornya. - - Jika row_year >= sdg_start_year DAN nama ada di SDG_INDICATOR_KEYWORDS → 'SDGs'. - - Selain itu → 'MDGs'. + ───────────────────────────────────────────────────────────────────────── + RULE 1: row_year < sdg_start_year + → selalu 'MDGs', tanpa kecuali. - Dengan cara ini, indikator seperti "Prevalence of anemia" yang datanya - ada sebelum era SDGs akan berlabel 'MDGs' untuk tahun-tahun pra-SDGs, - dan 'SDGs' untuk tahun-tahun pasca sdg_start_year. + RULE 2: row_year >= sdg_start_year AND nama ada di SDG_ONLY_KEYWORDS + → 'SDGs' + + RULE 3 (implisit): semua kondisi lain + → 'MDGs' + Ini mencakup shared indicators (stunting, wasting, overweight, + undernourishment) yang tidak terdaftar di SDG_ONLY_KEYWORDS, + sehingga tidak perlu di-list secara eksplisit. + ───────────────────────────────────────────────────────────────────────── """ - # Tahun sebelum era SDGs → selalu MDGs if row_year < sdg_start_year: return 'MDGs' - # Tahun >= sdg_start_year: cek apakah nama ada di SDG list - name_lower = str(indicator_name).lower().strip() - if name_lower in SDG_INDICATOR_KEYWORDS: + if str(indicator_name).lower().strip() in SDG_ONLY_KEYWORDS: return 'SDGs' - # Tidak ada di SDG list → MDGs return 'MDGs' @@ -174,10 +165,10 @@ class AnalyticalLayerLoader: norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware yoy_change, yoy_pct - PERUBAHAN (framework fix): - - framework di-assign per baris (per tahun), bukan per indikator. - - Baris dengan year < sdg_start_year selalu 'MDGs'. - - Baris dengan year >= sdg_start_year dan nama di SDG_INDICATOR_KEYWORDS → 'SDGs'. + FRAMEWORK LOGIC: + - year < sdg_start_year → 'MDGs' (semua indikator) + - year >= sdg_start_year + nama di SDG_ONLY_KEYWORDS → 'SDGs' (FIES + anaemia) + - selain itu (implisit) → 'MDGs' """ def __init__(self, client: bigquery.Client): @@ -282,7 +273,6 @@ class AnalyticalLayerLoader: self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) - # baseline_year = 2023 hardcode (syarat dosen: minimal 2023) df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_baseline['indicator_id'].nunique() @@ -522,7 +512,7 @@ class AnalyticalLayerLoader: return self.df_clean # ------------------------------------------------------------------ - # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL FIX) + # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL) # ------------------------------------------------------------------ def determine_sdg_start_year(self): @@ -530,8 +520,6 @@ class AnalyticalLayerLoader: self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL)") self.logger.info("=" * 80) - # actual_start_year per indikator = max(min_year per country) - # = konsisten dengan max_start_year di Step 5 indicator_actual_start = ( self.df_clean .groupby(['indicator_id', 'indicator_name', 'country_id'])['year'] @@ -559,18 +547,10 @@ class AnalyticalLayerLoader: for _, row in df_proxy.iterrows(): self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") - # ------------------------------------------------------------------ - # FIX: Assign framework PER BARIS (per tahun), bukan per indikator - # ------------------------------------------------------------------ - # Logic: - # row['year'] < sdg_start_year → 'MDGs' (apapun nama indikatornya) - # row['year'] >= sdg_start_year + nama di SDG_INDICATOR_KEYWORDS → 'SDGs' - # selain itu → 'MDGs' - # ------------------------------------------------------------------ - self.logger.info(f"\n Assigning framework PER ROW (year-level)...") - self.logger.info(f" Rule: year < {self.sdg_start_year} → MDGs (always)") - self.logger.info(f" Rule: year >= {self.sdg_start_year} + name in SDG list → SDGs") - self.logger.info(f" Rule: year >= {self.sdg_start_year} + name NOT in SDG list → MDGs") + self.logger.info(f"\n Assigning framework PER ROW...") + self.logger.info(f" year < {self.sdg_start_year} → MDGs (semua indikator)") + self.logger.info(f" year >= {self.sdg_start_year} + nama in SDG_ONLY_KEYWORDS → SDGs") + self.logger.info(f" selain itu (implisit) → MDGs") self.df_clean['framework'] = self.df_clean.apply( lambda row: assign_framework_for_row( @@ -582,22 +562,24 @@ class AnalyticalLayerLoader: ) # Log ringkasan per indikator untuk verifikasi - self.logger.info(f"\n {'Framework Assignment per Indicator (sample)':}") - self.logger.info(f" {'-'*95}") + self.logger.info(f"\n {'Framework Assignment per Indicator':}") + self.logger.info(f" {'-'*100}") self.logger.info( - f" {'ID':<5} {'Indicator Name':<50} " - f"{'Pre-SDG rows':<15} {'MDGs rows':<12} {'SDGs rows'}" + f" {'ID':<5} {'Indicator Name':<52} " + f"{'Pre-SDG':<10} {'MDGs':<10} {'SDGs':<10} {'SDG-Only?'}" ) - self.logger.info(f" {'-'*95}") + self.logger.info(f" {'-'*100}") for ind_id, grp in self.df_clean.groupby('indicator_id'): ind_name = grp['indicator_name'].iloc[0] pre_sdg = (grp['year'] < self.sdg_start_year).sum() mdgs_rows = (grp['framework'] == 'MDGs').sum() sdgs_rows = (grp['framework'] == 'SDGs').sum() + is_sdg_only = ind_name.lower().strip() in SDG_ONLY_KEYWORDS self.logger.info( - f" {int(ind_id):<5} {ind_name[:48]:<50} " - f"{pre_sdg:<15} {mdgs_rows:<12} {sdgs_rows}" + f" {int(ind_id):<5} {ind_name[:50]:<52} " + f"{pre_sdg:<10} {mdgs_rows:<10} {sdgs_rows:<10} " + f"{'YES' if is_sdg_only else 'no'}" ) fw_summary = self.df_clean['framework'].value_counts() @@ -605,15 +587,14 @@ class AnalyticalLayerLoader: f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items() )) - # Ringkasan unique indicators per framework di tahun terbaru (end_year) - end_year_df = self.df_clean[self.df_clean['year'] == self.end_year] + end_year_df = self.df_clean[self.df_clean['year'] == self.end_year] fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique() self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join( f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items() )) self.logger.info( - f"\n [OK] 'framework' ditambahkan (row-level) — " + f"\n [OK] 'framework' ditambahkan — " f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" ) @@ -651,7 +632,7 @@ class AnalyticalLayerLoader: return True # ------------------------------------------------------------------ - # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR PER COUNTRY + # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR # ------------------------------------------------------------------ def calculate_norm_value(self): @@ -663,16 +644,7 @@ class AnalyticalLayerLoader: - Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus) sehingga nilai antar negara dan antar tahun tetap comparable. - lower_better diinvert: nilai tinggi selalu = kondisi lebih baik. - Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert. - Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio. - - Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan - (persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama. - - Catatan: - - Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer - yang skala 0-1 dan dipakai untuk agregasi composite score. - - norm_value_1_100 ini adalah per baris (per country per year per indicator), - untuk ditampilkan langsung di Looker Studio. """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") @@ -682,7 +654,7 @@ class AnalyticalLayerLoader: "negative", "lower_better", "lower_is_better", "inverse", "neg", }) - df = self.df_clean.copy() + df = self.df_clean.copy() norm_parts = [] indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) @@ -700,21 +672,17 @@ class AnalyticalLayerLoader: norm_parts.append(grp) continue - raw = grp.loc[valid_mask, 'value'].values - v_min = raw.min() - v_max = raw.max() - normed = np.full(len(grp), np.nan) + raw = grp.loc[valid_mask, 'value'].values + v_min = raw.min() + v_max = raw.max() + normed = np.full(len(grp), np.nan) if v_min == v_max: - # Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100) normed[valid_mask.values] = 50.5 else: - # Min-max ke 0-1 dulu scaled = (raw - v_min) / (v_max - v_min) - # Invert jika lower_better if do_invert: scaled = 1.0 - scaled - # Scale ke 1-100 normed[valid_mask.values] = 1.0 + scaled * 99.0 grp['norm_value_1_100'] = normed @@ -727,7 +695,6 @@ class AnalyticalLayerLoader: self.df_clean = pd.concat(norm_parts, ignore_index=True) - # Statistik ringkasan valid_norm = self.df_clean['norm_value_1_100'].notna().sum() null_norm = self.df_clean['norm_value_1_100'].isna().sum() self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") @@ -737,7 +704,6 @@ class AnalyticalLayerLoader: f"{self.df_clean['norm_value_1_100'].max():.2f}" ) - # Log distribusi kondisi berdasarkan threshold self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition) cond_dist = self.df_clean['_condition_preview'].value_counts() self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):") @@ -813,7 +779,6 @@ class AnalyticalLayerLoader: 'start_year', 'end_year', 'country_count' ] - # Framework per indikator di end_year (untuk display — representasi terbaru) fw_at_end = ( self.df_clean[self.df_clean['year'] == self.end_year] .groupby('indicator_id')['framework'] @@ -909,13 +874,11 @@ class AnalyticalLayerLoader: self.logger.info(f" Total rows: {len(analytical_df):,}") - # Framework distribution per row fw_dist_rows = analytical_df['framework'].value_counts() self.logger.info(f" Framework distribution (rows):") for fw, cnt in fw_dist_rows.items(): self.logger.info(f" {fw}: {cnt:,} rows") - # Framework distribution per unique indicator (at end_year) fw_dist_ind = ( analytical_df[analytical_df['year'] == self.end_year] .drop_duplicates('indicator_id')['framework'] @@ -966,14 +929,19 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_start_year' : self.sdg_start_year, - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', - 'framework_logic' : 'row-level: year < sdg_start_year → MDGs always', - 'condition_thresholds': { + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'baseline_year' : self.baseline_year, + 'sdg_start_year' : self.sdg_start_year, + 'fixed_countries' : len(self.selected_country_ids), + 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'framework_logic' : ( + 'row-level: year < sdg_start_year → MDGs always; ' + 'year >= sdg_start_year + SDG_ONLY_KEYWORDS → SDGs; ' + 'else (implicit) → MDGs' + ), + 'sdg_only_keywords_count' : len(SDG_ONLY_KEYWORDS), + 'condition_thresholds' : { 'bad' : f'< {THRESHOLD_BAD}', 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', 'good' : f'> {THRESHOLD_GOOD}', @@ -1007,7 +975,7 @@ class AnalyticalLayerLoader: self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info("Framework: row-level (year < sdg_start_year → MDGs always)") + self.logger.info("Framework: year < sdg_start_year → MDGs | SDG_ONLY → SDGs | else → MDGs (implicit)") self.logger.info("=" * 80) self.load_source_data() @@ -1017,8 +985,8 @@ class AnalyticalLayerLoader: self.filter_indicators_consistent_across_fixed_countries() self.determine_sdg_start_year() self.verify_no_gaps() - self.calculate_norm_value() # Step 8: norm_value_1_100 - self.calculate_yoy() # Step 9: yoy_change, yoy_pct + self.calculate_norm_value() + self.calculate_yoy() self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -1058,7 +1026,7 @@ if __name__ == "__main__": print("Output: fact_asean_food_security_selected -> fs_asean_gold") print(f"Norm: min-max 1-100 per indicator, direction-aware") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - print(f"Framework: row-level (year < sdg_start_year → MDGs always)") + print(f"Framework: year < sdg_start_year → MDGs | SDG_ONLY → SDGs | else → MDGs (implicit)") print("=" * 80) logger = setup_logging()