diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 018be28..bf1381e 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -1,14 +1,39 @@ """ BIGQUERY ANALYTICAL LAYER - DATA FILTERING -FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') +fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) -2. Determine year boundaries (2013 - auto-detected end year) +2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Save analytical table (dengan nama/label lengkap untuk Looker Studio) +6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year +7. Verify no gaps +8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) +9. Calculate YoY per indicator per country +10. Analyze indicator availability by year +11. Save analytical table + +NORMALISASI (Step 8): +- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100 +- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik +- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus) + sehingga nilai antar negara dan antar tahun tetap comparable +- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio + +FRAMEWORK LOGIC (Per-Row, bukan per indikator): +- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap + di semua fixed countries (setelah Step 3-5 filter selesai) +- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure") + Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs +- Framework di-assign PER BARIS (per year), bukan per indikator: + * row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs' + * Selain itu -> 'MDGs' +- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment) + yang datanya ada sebelum SDGs: + * row lama (year < sdg_start_year) -> 'MDGs' + * row baru (year >= sdg_start_year) -> 'SDGs' """ import pandas as pd @@ -34,6 +59,128 @@ from scripts.bigquery_helpers import ( from google.cloud import bigquery +# ============================================================================= +# SDG INDICATOR KEYWORDS +# Daftar nama indikator (lowercase) yang masuk SDG framework. +# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year, +# dan 'MDGs' untuk baris dengan year < sdg_start_year. +# ============================================================================= + +SDG_INDICATOR_KEYWORDS = frozenset([ + # TARGET 2.1.1 — Prevalence of undernourishment (shared: ada sebelum SDGs) + "prevalence of undernourishment (percent) (3-year average)", + "number of people undernourished (million) (3-year average)", + # TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs) + "prevalence of severe food insecurity in the total population (percent) (3-year average)", + "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", + "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", + "number of severely food insecure people (million) (3-year average)", + "number of severely food insecure male adults (million) (3-year average)", + "number of severely food insecure female adults (million) (3-year average)", + "number of moderately or severely food insecure people (million) (3-year average)", + "number of moderately or severely food insecure male adults (million) (3-year average)", + "number of moderately or severely food insecure female adults (million) (3-year average)", + # TARGET 2.2.1 — Stunting (shared: ada sebelum SDGs) + "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", + "number of children under 5 years of age who are stunted (modeled estimates) (million)", + # TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs) + "percentage of children under 5 years affected by wasting (percent)", + "number of children under 5 years affected by wasting (million)", + "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", + "number of children under 5 years of age who are overweight (modeled estimates) (million)", + # TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar + # baris >= sdg_start_year di-assign 'SDGs') + "prevalence of anemia among women of reproductive age (15-49 years) (percent)", + "number of women of reproductive age (15-49 years) affected by anemia (million)", +]) + +# ============================================================================= +# SDG ERA PROXY KEYWORDS +# HANYA indikator yang MURNI baru di era SDGs (FIES saja). +# Dipakai untuk mendeteksi sdg_start_year dari data. +# +# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy: +# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya +# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year +# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'. +# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs +# dan dapat dipakai sebagai penanda tahun mulainya era SDGs. +# ============================================================================= +_SDG_ERA_PROXY_KEYWORDS = frozenset([ + "food insecurity", + "food insecure", +]) + +# ============================================================================= +# THRESHOLD KONDISI (fixed absolute, skala 1-100) +# ============================================================================= +# Digunakan untuk assign kondisi di analysis_layer. +# Didefinisikan di sini agar konsisten antara kedua file. +# bad : norm_value_1_100 < THRESHOLD_BAD +# good : norm_value_1_100 > THRESHOLD_GOOD +# moderate : di antara keduanya + +THRESHOLD_BAD = 40.0 +THRESHOLD_GOOD = 60.0 + + +def assign_condition(norm_value_1_100: float) -> str: + """ + Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). + Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert). + + Returns: 'good' / 'moderate' / 'bad' + """ + if pd.isna(norm_value_1_100): + return None + if norm_value_1_100 > THRESHOLD_GOOD: + return 'good' + if norm_value_1_100 < THRESHOLD_BAD: + return 'bad' + return 'moderate' + + +def assign_framework_per_row( + indicator_name: str, + year: int, + sdg_start_year: int, +) -> str: + """ + Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator. + + Logic: + - 'SDGs' jika KEDUA kondisi terpenuhi: + 1. Nama indikator ada di SDG_INDICATOR_KEYWORDS + 2. year (tahun baris ini) >= sdg_start_year + - 'MDGs' untuk semua kasus lain. + + Mengapa per row, bukan per indikator? + Indikator "shared" seperti anemia, stunting, wasting, undernourishment + memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan + per indikator menggunakan actual_start_year, indikator-indikator ini + akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year. + Dengan assign per row menggunakan year baris: + - baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs) + - baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs) + + Contoh anemia (sdg_start_year = 2016): + - row year=2013 -> 'MDGs' + - row year=2014 -> 'MDGs' + - row year=2015 -> 'MDGs' + - row year=2016 -> 'SDGs' + - row year=2017 -> 'SDGs' + - ... + """ + name_lower = str(indicator_name).lower().strip() + in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS + if in_sdg_list and int(year) >= sdg_start_year: + return 'SDGs' + return 'MDGs' + + # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -42,13 +189,19 @@ class AnalyticalLayerLoader: """ Analytical Layer Loader for BigQuery - Key Logic: - 1. Complete per country (no gaps from start_year to end_year) - 2. Filter countries with all pillars - 3. Ensure indicators have consistent country count across all years - 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio + Output kolom fact_asean_food_security_selected: + country_id, country_name, + indicator_id, indicator_name, direction, framework, + pillar_id, pillar_name, + time_id, year, value, + norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware + yoy_change, yoy_pct - Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold + Catatan framework: + Framework di-assign PER BARIS (per year), sehingga indikator shared + seperti anemia dapat memiliki framework berbeda di baris yang berbeda: + - baris sebelum sdg_start_year -> 'MDGs' + - baris sejak sdg_start_year -> 'SDGs' """ def __init__(self, client: bigquery.Client): @@ -65,7 +218,9 @@ class AnalyticalLayerLoader: self.start_year = 2013 self.end_year = None - self.baseline_year = 2023 + self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap) + + self.sdg_start_year = None self.pipeline_metadata = { 'source_class' : self.__class__.__name__, @@ -81,6 +236,10 @@ class AnalyticalLayerLoader: self.pipeline_start = None self.pipeline_end = None + # ------------------------------------------------------------------ + # STEP 1: LOAD SOURCE DATA + # ------------------------------------------------------------------ + def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") @@ -111,14 +270,17 @@ class AnalyticalLayerLoader: """ self.logger.info("Loading fact table with dimensions...") - self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) + self.df_clean = self.client.query(query).result().to_dataframe( + create_bqstorage_client=False + ) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() - self.logger.info(f" Breakdown:") - self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}") - self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}") + self.logger.info( + f" Single years: {yr.get(False, 0):,} | " + f"Year ranges: {yr.get(True, 0):,}" + ) self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') @@ -135,34 +297,48 @@ class AnalyticalLayerLoader: self.logger.error(f"Error loading source data: {e}") raise + # ------------------------------------------------------------------ + # STEP 2: DETERMINE YEAR BOUNDARIES + # ------------------------------------------------------------------ + def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) - df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] - baseline_indicator_count = df_2023['indicator_id'].nunique() + # Filter single years only (is_year_range == False) + if 'is_year_range' in self.df_clean.columns: + before = len(self.df_clean) + self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy() + self.logger.info( + f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows" + ) - self.logger.info(f"\nBaseline Year: {self.baseline_year}") - self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") + # baseline_year = 2023 hardcode (syarat dosen: minimal 2023) + df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] + baseline_indicator_count = df_baseline['indicator_id'].nunique() + + self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}") + self.logger.info(f" Baseline indicator count: {baseline_indicator_count}") years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) selected_end_year = None + self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):") for year in years_sorted: if year >= self.baseline_year: df_year = self.df_clean[self.df_clean['year'] == year] year_indicator_count = df_year['indicator_id'].nunique() status = "OK" if year_indicator_count >= baseline_indicator_count else "X" - self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") + self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") if year_indicator_count >= baseline_indicator_count and selected_end_year is None: selected_end_year = int(year) if selected_end_year is None: selected_end_year = self.baseline_year - self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") + self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}") else: - self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") + self.logger.info(f"\n [OK] Selected end year: {selected_end_year}") self.end_year = selected_end_year original_count = len(self.df_clean) @@ -172,11 +348,15 @@ class AnalyticalLayerLoader: (self.df_clean['year'] <= self.end_year) ].copy() - self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") - self.logger.info(f" Rows before: {original_count:,}") - self.logger.info(f" Rows after: {len(self.df_clean):,}") + self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:") + self.logger.info(f" Rows before: {original_count:,}") + self.logger.info(f" Rows after : {len(self.df_clean):,}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY + # ------------------------------------------------------------------ + def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") @@ -228,10 +408,15 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") - df_valid = pd.DataFrame(valid_combinations) - df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) - self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' + - self.df_clean['indicator_id'].astype(str)) + df_valid = pd.DataFrame(valid_combinations) + df_valid['key'] = ( + df_valid['country_id'].astype(str) + '_' + + df_valid['indicator_id'].astype(str) + ) + self.df_clean['key'] = ( + self.df_clean['country_id'].astype(str) + '_' + + self.df_clean['indicator_id'].astype(str) + ) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() @@ -243,6 +428,10 @@ class AnalyticalLayerLoader: self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 4: SELECT COUNTRIES WITH ALL PILLARS + # ------------------------------------------------------------------ + def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") @@ -265,18 +454,26 @@ class AnalyticalLayerLoader: f"{row['pillar_count']}/{total_pillars} pillars" ) - selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars] + selected_countries = country_pillar_count[ + country_pillar_count['pillar_count'] == total_pillars + ] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) - self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy() + self.df_clean = self.df_clean[ + self.df_clean['country_id'].isin(self.selected_country_ids) + ].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES + # ------------------------------------------------------------------ + def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") @@ -285,7 +482,9 @@ class AnalyticalLayerLoader: indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() - indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year'] + indicator_country_start.columns = [ + 'indicator_id', 'indicator_name', 'country_id', 'start_year' + ] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' @@ -330,16 +529,26 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") + if removed_indicators: + self.logger.info(f"\n Removed indicators:") + for item in removed_indicators: + self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}") + if not valid_indicators: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) - self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy() + self.df_clean = self.df_clean[ + self.df_clean['indicator_id'].isin(valid_indicators) + ].copy() self.df_clean = self.df_clean.merge( - indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' + indicator_max_start[['indicator_id', 'max_start_year']], + on='indicator_id', how='left' ) - self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy() + self.df_clean = self.df_clean[ + self.df_clean['year'] >= self.df_clean['max_start_year'] + ].copy() self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") @@ -349,18 +558,136 @@ class AnalyticalLayerLoader: self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW + # ------------------------------------------------------------------ + + def determine_sdg_start_year(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW") + self.logger.info("=" * 80) + self.logger.info( + " Proxy: FIES only (food insecurity/food insecure).\n" + " Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n" + " Framework di-assign PER BARIS (year), bukan per indikator." + ) + + # actual_start_year per indikator = max(min_year per country) + # = konsisten dengan max_start_year di Step 5 + indicator_actual_start = ( + self.df_clean + .groupby(['indicator_id', 'indicator_name', 'country_id'])['year'] + .min().reset_index() + .groupby(['indicator_id', 'indicator_name'])['year'] + .max().reset_index() + ) + indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] + + # Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia) + proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply( + lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS) + ) + df_proxy = indicator_actual_start[proxy_mask] + + if df_proxy.empty: + raise ValueError( + "Tidak ada indikator proxy SDGs (FIES) yang lolos filter. " + "Pastikan indikator FIES (food insecurity/food insecure) ada di data." + ) + + self.sdg_start_year = int(df_proxy['actual_start_year'].min()) + self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}") + self.logger.info(f" Proxy indicators (FIES only):") + for _, row in df_proxy.iterrows(): + self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") + + # ---------------------------------------------------------------- + # Assign framework PER BARIS menggunakan year baris, bukan actual_start_year + # Sehingga indikator "shared" (anemia, stunting, dll) mendapat: + # - 'MDGs' untuk baris sebelum sdg_start_year + # - 'SDGs' untuk baris sejak sdg_start_year + # ---------------------------------------------------------------- + self.df_clean['framework'] = self.df_clean.apply( + lambda row: assign_framework_per_row( + indicator_name = row['indicator_name'], + year = int(row['year']), + sdg_start_year = self.sdg_start_year, + ), + axis=1 + ) + + # ---------------------------------------------------------------- + # Logging: ringkasan per indikator (frameworks apa yang muncul) + # ---------------------------------------------------------------- + ind_fw_summary = ( + self.df_clean + .groupby(['indicator_id', 'indicator_name'])['framework'] + .unique() + .reset_index() + ) + ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply( + lambda x: '/'.join(sorted(x)) + ) + ind_fw_summary = ind_fw_summary.merge( + indicator_actual_start[['indicator_id', 'actual_start_year']], + on='indicator_id', how='left' + ) + + self.logger.info(f"\n Framework assignment per indikator:") + self.logger.info(f" {'-'*85}") + self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}") + self.logger.info(f" {'-'*85}") + for _, row in ind_fw_summary.sort_values( + ['frameworks', 'actual_start_year', 'indicator_name'] + ).iterrows(): + self.logger.info( + f" {int(row['indicator_id']):<5} {row['frameworks']:<18} " + f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}" + ) + + # Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi + split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs'] + if not split_inds.empty: + self.logger.info( + f"\n [INFO] {len(split_inds)} indikator memiliki framework split " + f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):" + ) + for _, row in split_inds.iterrows(): + self.logger.info(f" - {row['indicator_name'][:60]}") + + fw_summary = self.df_clean['framework'].value_counts() + self.logger.info( + f"\n Ringkasan rows: " + + " | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()) + ) + + self.logger.info( + f"\n [OK] 'framework' ditambahkan per row — " + f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " + f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" + ) + return self.df_clean + + # ------------------------------------------------------------------ + # STEP 7: VERIFY NO GAPS + # ------------------------------------------------------------------ + def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: VERIFY NO GAPS") + self.logger.info("STEP 7: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) - verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index() + verification = self.df_clean.groupby( + ['indicator_id', 'year'] + )['country_id'].nunique().reset_index() verification.columns = ['indicator_id', 'year', 'country_count'] all_good = (verification['country_count'] == expected_countries).all() if all_good: - self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries") + self.logger.info( + f" VERIFICATION PASSED — all combinations have {expected_countries} countries" + ) else: bad = verification[verification['country_count'] != expected_countries] for _, row in bad.head(10).iterrows(): @@ -372,9 +699,143 @@ class AnalyticalLayerLoader: return True + # ------------------------------------------------------------------ + # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR + # ------------------------------------------------------------------ + + def calculate_norm_value(self): + """ + Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, + direction-aware. + + CARA KERJA: + - Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus) + sehingga nilai antar negara dan antar tahun tetap comparable. + - lower_better diinvert: nilai tinggi selalu = kondisi lebih baik. + Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert. + - Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio. + - Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan + (persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama. + + Catatan: + - Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer + yang skala 0-1 dan dipakai untuk agregasi composite score. + - norm_value_1_100 ini adalah per baris (per country per year per indicator), + untuk ditampilkan langsung di Looker Studio. + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") + self.logger.info("=" * 80) + + DIRECTION_INVERT = frozenset({ + "negative", "lower_better", "lower_is_better", "inverse", "neg", + }) + + df = self.df_clean.copy() + norm_parts = [] + + indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) + self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}") + self.logger.info(f" {'-'*90}") + + for (ind_id, ind_name, direction), grp in indicators: + grp = grp.copy() + do_invert = str(direction).lower().strip() in DIRECTION_INVERT + valid_mask = grp['value'].notna() + n_valid = valid_mask.sum() + + if n_valid < 2: + grp['norm_value_1_100'] = np.nan + norm_parts.append(grp) + continue + + raw = grp.loc[valid_mask, 'value'].values + v_min = raw.min() + v_max = raw.max() + normed = np.full(len(grp), np.nan) + + if v_min == v_max: + # Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100) + normed[valid_mask.values] = 50.5 + else: + # Min-max ke 0-1 dulu + scaled = (raw - v_min) / (v_max - v_min) + # Invert jika lower_better + if do_invert: + scaled = 1.0 - scaled + # Scale ke 1-100 + normed[valid_mask.values] = 1.0 + scaled * 99.0 + + grp['norm_value_1_100'] = normed + + self.logger.info( + f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} " + f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}" + ) + norm_parts.append(grp) + + self.df_clean = pd.concat(norm_parts, ignore_index=True) + + # Statistik ringkasan + valid_norm = self.df_clean['norm_value_1_100'].notna().sum() + null_norm = self.df_clean['norm_value_1_100'].isna().sum() + self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") + self.logger.info( + f" Range aktual: " + f"{self.df_clean['norm_value_1_100'].min():.2f} - " + f"{self.df_clean['norm_value_1_100'].max():.2f}" + ) + + # Log distribusi kondisi berdasarkan threshold + self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition) + cond_dist = self.df_clean['_condition_preview'].value_counts() + self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):") + for cond, cnt in cond_dist.items(): + self.logger.info(f" {cond}: {cnt:,} rows") + self.df_clean = self.df_clean.drop(columns=['_condition_preview']) + + self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean") + return self.df_clean + + # ------------------------------------------------------------------ + # STEP 9: CALCULATE YOY + # ------------------------------------------------------------------ + + def calculate_yoy(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") + self.logger.info("=" * 80) + + df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() + + df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) + df['yoy_change'] = df['value'] - df['value_prev'] + df['yoy_pct'] = np.where( + df['value_prev'].notna() & (df['value_prev'] != 0), + (df['yoy_change'] / df['value_prev'].abs()) * 100, + np.nan + ) + df = df.drop(columns=['value_prev']) + + total_rows = len(df) + valid_yoy = df['yoy_pct'].notna().sum() + null_yoy = df['yoy_pct'].isna().sum() + + self.logger.info(f" Total rows : {total_rows:,}") + self.logger.info(f" YoY calculated : {valid_yoy:,}") + self.logger.info(f" YoY NULL (base yr): {null_yoy:,}") + + self.df_clean = df + self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan") + return self.df_clean + + # ------------------------------------------------------------------ + # STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR + # ------------------------------------------------------------------ + def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") + self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ @@ -400,89 +861,147 @@ class AnalyticalLayerLoader: 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'start_year', 'end_year', 'country_count' ] + + # Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split) + ind_fw = ( + self.df_clean + .groupby('indicator_id')['framework'] + .unique() + .reset_index() + ) + ind_fw['framework_label'] = ind_fw['framework'].apply( + lambda x: '/'.join(sorted(x)) + ) + indicator_details = indicator_details.merge( + ind_fw[['indicator_id', 'framework_label']], + on='indicator_id', how='left' + ) + indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) - indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name']) + indicator_details = indicator_details.sort_values( + ['framework_label', 'pillar_name', 'start_year', 'indicator_name'] + ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") - for pillar, count in indicator_details.groupby('pillar_name').size().items(): - self.logger.info(f" {pillar}: {count} indicators") + self.logger.info(f"Framework breakdown (per indicator label):") + for fw, count in indicator_details.groupby('framework_label').size().items(): + self.logger.info(f" {fw}: {count} indicators") - self.logger.info(f"\n{'-'*100}") - self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}") - self.logger.info(f"{'-'*100}") + self.logger.info(f"\n{'-'*115}") + self.logger.info( + f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " + f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}" + ) + self.logger.info(f"{'-'*115}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} " - f"{direction:<8} {int(row['country_count'])}" + f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} " + f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) return year_stats + # ------------------------------------------------------------------ + # STEP 11: SAVE ANALYTICAL TABLE + # ------------------------------------------------------------------ + def save_analytical_table(self): - # --------------------------------------------------------------- - # CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio - # --------------------------------------------------------------- table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) - self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") + self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: - # ------------------------------------------------------------------ - # Pilih kolom: ID + Nama lengkap + value - # Kolom nama memudahkan filtering/slicing langsung di Looker Studio - # tanpa perlu join ulang ke tabel dimensi. - # ------------------------------------------------------------------ + if 'framework' not in self.df_clean.columns: + raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.") + if 'norm_value_1_100' not in self.df_clean.columns: + raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.") + if 'yoy_change' not in self.df_clean.columns: + raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.") + analytical_df = self.df_clean[[ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'direction', + 'framework', 'pillar_id', 'pillar_name', 'time_id', 'year', 'value', + 'norm_value_1_100', + 'yoy_change', + 'yoy_pct', ]].copy() analytical_df = analytical_df.sort_values( ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) - # Pastikan tipe data konsisten - analytical_df['country_id'] = analytical_df['country_id'].astype(int) - analytical_df['country_name'] = analytical_df['country_name'].astype(str) - analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) - analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) - analytical_df['direction'] = analytical_df['direction'].astype(str) - analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) - analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) - analytical_df['time_id'] = analytical_df['time_id'].astype(int) - analytical_df['year'] = analytical_df['year'].astype(int) - analytical_df['value'] = analytical_df['value'].astype(float) + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['country_name'] = analytical_df['country_name'].astype(str) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) + analytical_df['direction'] = analytical_df['direction'].astype(str) + analytical_df['framework'] = analytical_df['framework'].astype(str) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['year'] = analytical_df['year'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) + analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float) + analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) + analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) - self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") - # Schema BigQuery + # Framework distribution per row + fw_dist_rows = analytical_df['framework'].value_counts() + self.logger.info(f" Framework distribution (rows):") + for fw, cnt in fw_dist_rows.items(): + self.logger.info(f" {fw}: {cnt:,} rows") + + # Framework distribution per indikator (label) + ind_fw_label = ( + analytical_df + .groupby('indicator_id')['framework'] + .unique() + .apply(lambda x: '/'.join(sorted(x))) + .value_counts() + ) + self.logger.info(f" Framework distribution (per indicator label):") + for fw, cnt in ind_fw_label.items(): + self.logger.info(f" {fw}: {cnt} indicators") + + self.logger.info( + f" norm_value_1_100 range: " + f"{analytical_df['norm_value_1_100'].min():.2f} - " + f"{analytical_df['norm_value_1_100'].max():.2f}" + ) + schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( @@ -503,34 +1022,51 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'fixed_countries': len(self.selected_country_ids), - 'no_gaps' : True, - 'layer' : 'gold', - 'columns' : 'id + name + value (Looker Studio ready)' + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'baseline_year' : self.baseline_year, + 'sdg_start_year' : self.sdg_start_year, + 'fixed_countries' : len(self.selected_country_ids), + 'norm_scale' : '1-100 per indicator global minmax direction-aware', + 'framework_assignment' : 'per-row by year (not per-indicator)', + 'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS), + 'condition_thresholds' : { + 'bad' : f'< {THRESHOLD_BAD}', + 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', + 'good' : f'> {THRESHOLD_GOOD}', + }, }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators': int(self.df_clean['indicator_id'].nunique()) + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), + 'sdg_start_year' : self.sdg_start_year, + 'framework_dist_rows' : fw_dist_rows.to_dict(), + 'framework_dist_inds' : ind_fw_label.to_dict(), }) } save_etl_metadata(self.client, metadata) - self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") - self.logger.info(f" Metadata → [AUDIT] etl_metadata") + self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold") return rows_loaded except Exception as e: self.logger.error(f"Error saving: {e}") raise + # ------------------------------------------------------------------ + # RUN + # ------------------------------------------------------------------ + def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) - self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold") + self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") + self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") + self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)") + self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)") + self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") self.logger.info("=" * 80) self.load_source_data() @@ -538,7 +1074,10 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() + self.determine_sdg_start_year() # Step 6: per-row framework assignment self.verify_no_gaps() + self.calculate_norm_value() # Step 8: norm_value_1_100 + self.calculate_yoy() # Step 9: yoy_change, yoy_pct self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -548,11 +1087,12 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -560,10 +1100,6 @@ class AnalyticalLayerLoader: # ============================================================================= def run_analytical_layer(): - """ - Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims. - Dipanggil setelah dimensional_model_to_gold selesai. - """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() loader = AnalyticalLayerLoader(client) @@ -577,7 +1113,11 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) - print("Output: fact_asean_food_security_selected → fs_asean_gold") + print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") + print("Output: fact_asean_food_security_selected -> fs_asean_gold") + print(f"Norm: min-max 1-100 per indicator, direction-aware") + print(f"Framework: per-row by year | SDG Proxy: FIES only") + print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") print("=" * 80) logger = setup_logging() @@ -587,4 +1127,6 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") + print(f" SDG Start Year : {loader.sdg_start_year}") + print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file